Skip to content

Commit

Permalink
Merge branch 'develop' into feature/canonicalKey
Browse files Browse the repository at this point in the history
  • Loading branch information
danovaro committed Aug 12, 2024
2 parents 3e41e96 + 3e0a63c commit afded53
Show file tree
Hide file tree
Showing 34 changed files with 448 additions and 33 deletions.
2 changes: 1 addition & 1 deletion src/fdb5/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ list( APPEND fdb5_srcs
api/helpers/PurgeIterator.h
api/helpers/StatsIterator.cc
api/helpers/StatsIterator.h
api/helpers/ArchiveCallback.h
api/helpers/Callback.h

api/local/QueryVisitor.h
api/local/QueueStringLogTarget.h
Expand Down
12 changes: 12 additions & 0 deletions src/fdb5/LibFdb5.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ const Config& LibFdb5::defaultConfig(const eckit::Configuration& userConfig) {
return *config_;
}

ConstructorCallback LibFdb5::constructorCallback() {
return constructorCallback_;
}

void LibFdb5::registerConstructorCallback(ConstructorCallback cb) {
constructorCallback_ = cb;
}

bool LibFdb5::dontDeregisterFactories() const {
#if eckit_VERSION_MAJOR > 1 || (eckit_VERSION_MAJOR == 1 && (eckit_VERSION_MINOR > 17 || (eckit_VERSION_MINOR == 17 && eckit_VERSION_PATCH >0)))
return eckit::LibEcKit::instance().dontDeregisterFactories();
Expand All @@ -74,6 +82,10 @@ RemoteProtocolVersion LibFdb5::remoteProtocolVersion() const {
}


const std::set<std::string>& LibFdb5::auxiliaryRegistry() {
static std::set<std::string> auxiliaryRegistry(eckit::Resource<std::set<std::string>>("$FDB_AUX_EXTENSIONS;fdbAuxExtensions", {"gribjump"}));
return auxiliaryRegistry;
}
//----------------------------------------------------------------------------------------------------------------------

static unsigned getUserEnvRemoteProtocol() {
Expand Down
8 changes: 8 additions & 0 deletions src/fdb5/LibFdb5.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "fdb5/database/DB.h"
#include "fdb5/types/TypesRegistry.h"
#include "fdb5/api/helpers/Callback.h"

namespace fdb5 {

Expand Down Expand Up @@ -75,13 +76,20 @@ class LibFdb5 : public eckit::system::Library {

bool dontDeregisterFactories() const;

void registerConstructorCallback(ConstructorCallback cb);

ConstructorCallback constructorCallback();

static const std::set<std::string>& auxiliaryRegistry();

protected:
virtual std::string version() const;

virtual std::string gitsha1(unsigned int count) const;

private:
std::unique_ptr<Config> config_;
ConstructorCallback constructorCallback_ = CALLBACK_CONSTRUCTOR_NOOP;
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
17 changes: 13 additions & 4 deletions src/fdb5/api/FDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include "eckit/message/Reader.h"
#include "eckit/utils/StringTools.h"

#include "eckit/system/Plugin.h"
#include "eckit/system/LibraryManager.h"

#include "metkit/hypercube/HyperCubePayloaded.h"

#include "fdb5/LibFdb5.h"
Expand All @@ -39,7 +42,9 @@ namespace fdb5 {
FDB::FDB(const Config &config) :
internal_(FDBFactory::instance().build(config)),
dirty_(false),
reportStats_(config.getBool("statistics", false)) {}
reportStats_(config.getBool("statistics", false)) {
LibFdb5::instance().constructorCallback()(*this);
}


FDB::~FDB() {
Expand Down Expand Up @@ -284,11 +289,11 @@ void FDB::print(std::ostream& s) const {

void FDB::flush() {
if (dirty_) {

eckit::Timer timer;
timer.start();

internal_->flush();
flushCallback_();
dirty_ = false;

timer.stop();
Expand Down Expand Up @@ -322,8 +327,12 @@ bool FDB::enabled(const ControlIdentifier& controlIdentifier) const {
return internal_->enabled(controlIdentifier);
}

void FDB::registerCallback(ArchiveCallback callback) {
internal_->registerCallback(callback);
void FDB::registerArchiveCallback(ArchiveCallback callback) { // todo rename
internal_->registerArchiveCallback(callback);
}

void FDB::registerFlushCallback(FlushCallback callback) { // todo rename
flushCallback_ = callback;
}

//----------------------------------------------------------------------------------------------------------------------
Expand Down
6 changes: 4 additions & 2 deletions src/fdb5/api/FDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include "fdb5/api/helpers/WipeIterator.h"
#include "fdb5/api/helpers/MoveIterator.h"
#include "fdb5/config/Config.h"
#include "fdb5/api/helpers/ArchiveCallback.h"
#include "fdb5/api/helpers/Callback.h"

namespace eckit {
namespace message {
Expand Down Expand Up @@ -119,7 +119,8 @@ class FDB {

bool dirty() const;

void registerCallback(ArchiveCallback callback);
void registerArchiveCallback(ArchiveCallback callback);
void registerFlushCallback(FlushCallback callback);

// -------------- API management ----------------------------

Expand Down Expand Up @@ -155,6 +156,7 @@ class FDB {

FDBStats stats_;

FlushCallback flushCallback_ = CALLBACK_FLUSH_NOOP;
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions src/fdb5/api/FDBFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
#include "fdb5/api/helpers/PurgeIterator.h"
#include "fdb5/api/helpers/StatsIterator.h"
#include "fdb5/api/helpers/StatusIterator.h"
#include "fdb5/api/helpers/ArchiveCallback.h"
#include "fdb5/api/helpers/Callback.h"

namespace eckit {
namespace message {
Expand Down Expand Up @@ -91,7 +91,7 @@ class FDBBase : private eckit::NonCopyable {

virtual AxesIterator axes(const FDBToolRequest& request, int axes) { NOTIMP; }

void registerCallback(ArchiveCallback callback) {callback_ = callback;}
void registerArchiveCallback(ArchiveCallback callback) {callback_ = callback;}

// -------------- API management ----------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@
*/

#pragma once

#include <future>
#include "fdb5/database/Key.h"
#include "fdb5/database/FieldLocation.h"

namespace fdb5 {

using ArchiveCallback = std::function<void(const Key&, const FieldLocation&)>;
class FDB;

using ArchiveCallback = std::function<void(const Key& key, const void* data, size_t length, std::future<std::shared_ptr<FieldLocation>>)>;
using FlushCallback = std::function<void()>;
using ConstructorCallback = std::function<void(FDB&)>;

static const ArchiveCallback CALLBACK_NOOP = [](const Key&, const FieldLocation&) {};
static const ArchiveCallback CALLBACK_NOOP = [](const Key& key, const void* data, size_t length, std::future<std::shared_ptr<FieldLocation>>) {};
static const FlushCallback CALLBACK_FLUSH_NOOP = []() {};
static const ConstructorCallback CALLBACK_CONSTRUCTOR_NOOP = [](FDB&) {};

} // namespace fdb5
2 changes: 2 additions & 0 deletions src/fdb5/api/local/PurgeVisitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ void PurgeVisitor::visitDatum(const Field& field, const std::string& keyFingerpr
void PurgeVisitor::catalogueComplete(const Catalogue& catalogue) {
internalVisitor_->catalogueComplete(catalogue);

internalVisitor_->gatherAuxiliaryURIs();

if (!porcelain_) {
internalVisitor_->report(out_);
}
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/daos/DaosCatalogueWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ const Index& DaosCatalogueWriter::currentIndex() {
/// @todo: other writers may be simultaneously updating the axes KeyValues in DAOS. Should these
/// new updates be retrieved and put into in-memory axes from time to time, e.g. every
/// time a value is put in an axis KeyValue?
void DaosCatalogueWriter::archive(const Key& key, std::unique_ptr<FieldLocation> fieldLocation) {
void DaosCatalogueWriter::archive(const Key& key, std::shared_ptr<FieldLocation> fieldLocation) {

if (current_.null()) {
ASSERT(!currentIndexKey_.empty());
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/daos/DaosCatalogueWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class DaosCatalogueWriter : public DaosCatalogue, public CatalogueWriter {
void clean() override;
void close() override;

void archive(const Key& key, std::unique_ptr<FieldLocation> fieldLocation) override;
void archive(const Key& key, std::shared_ptr<FieldLocation> fieldLocation) override;

virtual void print( std::ostream &out ) const override { NOTIMP; }

Expand Down
4 changes: 4 additions & 0 deletions src/fdb5/daos/DaosStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ class DaosStore : public Store, public DaosCommon {

void checkUID() const override { /* nothing to do */ }

// DAOS store does not currently support auxiliary objects
std::vector<eckit::URI> getAuxiliaryURIs(const eckit::URI&) const override { return {}; }
bool auxiliaryURIExists(const eckit::URI&) const override { return false; }

protected: // methods

std::string type() const override { return "daos"; }
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/database/Catalogue.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class CatalogueReader {
class CatalogueWriter {
public:
virtual const Index& currentIndex() = 0;
virtual void archive(const Key& key, std::unique_ptr<FieldLocation> fieldLocation) = 0;
virtual void archive(const Key& key, std::shared_ptr<FieldLocation> fieldLocation) = 0;
virtual void overlayDB(const Catalogue& otherCatalogue, const std::set<std::string>& variableKeys, bool unmount) = 0;
virtual void index(const Key& key, const eckit::URI& uri, eckit::Offset offset, eckit::Length length) = 0;
virtual void reconsolidate() = 0;
Expand Down
10 changes: 7 additions & 3 deletions src/fdb5/database/DB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include "fdb5/database/DB.h"
#include "fdb5/database/Field.h"
#include "fdb5/toc/TocEngine.h"
#include "fdb5/api/helpers/ArchiveCallback.h"

using eckit::Log;

Expand Down Expand Up @@ -110,9 +109,14 @@ void DB::archive(const Key& key, const void* data, eckit::Length length, const K
ASSERT(cat);

const Index& idx = cat->currentIndex();
std::unique_ptr<FieldLocation> location(store().archive(idx.key(), data, length));

callback(field, *location);
std::shared_ptr<FieldLocation> location(store().archive(idx.key(), data, length));

// In anticipaton of store().archive() working asynchronously in later FDB versions.
std::promise<std::shared_ptr<FieldLocation>> promise;
promise.set_value(location);

callback(field, data, length, promise.get_future());

cat->archive(key, std::move(location));
}
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/database/DB.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include "fdb5/database/EntryVisitMechanism.h"
#include "fdb5/database/Key.h"
#include "fdb5/database/Store.h"
#include "fdb5/api/helpers/ArchiveCallback.h"
#include "fdb5/api/helpers/Callback.h"

namespace eckit {
class DataHandle;
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/database/Field.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace fdb5 {

Field::Field() {}

Field::Field(std::unique_ptr<FieldLocation> location, time_t timestamp, const FieldDetails& details):
Field::Field(std::shared_ptr<FieldLocation> location, time_t timestamp, const FieldDetails& details):
location_(std::move(location)),
timestamp_(timestamp),
details_(details) {
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/database/Field.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class Field {

Field();

Field(std::unique_ptr<FieldLocation> location, time_t timestamp, const FieldDetails& details = FieldDetails());
Field(std::shared_ptr<FieldLocation> location, time_t timestamp, const FieldDetails& details = FieldDetails());
Field(const FieldLocation&& location, time_t timestamp, const FieldDetails& details = FieldDetails());

eckit::DataHandle* dataHandle() const { return location_->dataHandle(); }
Expand Down
2 changes: 2 additions & 0 deletions src/fdb5/database/PurgeVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class PurgeVisitor : public virtual StatsReportVisitor {
virtual void report(std::ostream& out) const = 0;

virtual void purge(std::ostream& out, bool porcelain, bool doit) const = 0;

virtual void gatherAuxiliaryURIs() {} // NOOP by default
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions src/fdb5/database/Store.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class Store {
virtual std::vector<eckit::URI> collocatedDataURIs() const = 0;
virtual std::set<eckit::URI> asCollocatedDataURIs(const std::vector<eckit::URI>&) const = 0;

virtual std::vector<eckit::URI> getAuxiliaryURIs(const eckit::URI&) const { NOTIMP; }
virtual bool auxiliaryURIExists(const eckit::URI&) const { NOTIMP; }

protected: // members
const Schema& schema_; //<< schema is owned by catalogue which always outlives the store
};
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/toc/TocCatalogueWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ bool TocCatalogueWriter::enabled(const ControlIdentifier& controlIdentifier) con
return TocCatalogue::enabled(controlIdentifier);
}

void TocCatalogueWriter::archive(const Key& key, std::unique_ptr<FieldLocation> fieldLocation) {
void TocCatalogueWriter::archive(const Key& key, std::shared_ptr<FieldLocation> fieldLocation) {
dirty_ = true;

if (current_.null()) {
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/toc/TocCatalogueWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class TocCatalogueWriter : public TocCatalogue, public CatalogueWriter {
void clean() override;
void close() override;

void archive(const Key& key, std::unique_ptr<FieldLocation> fieldLocation) override;
void archive(const Key& key, std::shared_ptr<FieldLocation> fieldLocation) override;
void reconsolidateIndexesAndTocs();

virtual void print( std::ostream &out ) const override;
Expand Down
51 changes: 51 additions & 0 deletions src/fdb5/toc/TocPurgeVisitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,32 @@ bool TocPurgeVisitor::visitDatabase(const Catalogue& catalogue, const Store& sto
return true;
}

void TocPurgeVisitor::gatherAuxiliaryURIs() {
for (const auto& it : dataUsage_) { // <std::string, size_t>

// Check if .data file is deletable
bool deletable = false;
if (it.second == 0) {
eckit::PathName path(it.first);
if (store_.uriBelongs(eckit::URI(store_.type(), path))) {
deletable = true;
}
}

// Add auxiliary files to the corresponding set
eckit::URI uri(store_.type(), eckit::PathName(it.first));
for (const auto& auxURI : store_.getAuxiliaryURIs(uri)) {
if (!store_.auxiliaryURIExists(auxURI)) continue;
// Todo: in future can we just use URIs, not paths?
eckit::PathName auxPath = auxURI.path();
if (deletable) {
deleteAuxFiles_.insert(auxPath);
} else {
keepAuxFiles_.insert(auxPath);
}
}
}
}

void TocPurgeVisitor::report(std::ostream& out) const {

Expand Down Expand Up @@ -112,8 +138,27 @@ void TocPurgeVisitor::report(std::ostream& out) const {
if (!cnt2) {
out << " - NONE -" << std::endl;
}
out << std::endl;

// Auxiliary files
out << "Auxiliary files to be deleted:" << std::endl;
for (const auto& it : deleteAuxFiles_) {
out << " " << it << std::endl;
}
if (deleteAuxFiles_.empty()) {
out << " - NONE -" << std::endl;
}
out << std::endl;

out << "Auxiliary files to be kept:" << std::endl;
for (const auto& it : keepAuxFiles_) {
out << " " << it << std::endl;
}
if (keepAuxFiles_.empty()) {
out << " - NONE -" << std::endl;
}
out << std::endl;

size_t cnt3 = 0;
out << "Index files to be deleted:" << std::endl;
for (const auto& it : indexUsage_) { // <std::string, size_t>
Expand Down Expand Up @@ -163,6 +208,12 @@ void TocPurgeVisitor::purge(std::ostream& out, bool porcelain, bool doit) const
}
}

for (const auto& path : deleteAuxFiles_) {
if (path.dirName().sameAs(directory) && keepAuxFiles_.find(path) == keepAuxFiles_.end()) {
store_.remove(eckit::URI(store_.type(), path), logAlways, logVerbose, doit);
}
}

for (const auto& it : indexUsage_) { // <std::string, size_t>
if (it.second == 0) {
eckit::PathName path(it.first);
Expand Down
Loading

0 comments on commit afded53

Please sign in to comment.