Skip to content

Commit

Permalink
fdb read returns a seekable handle
Browse files Browse the repository at this point in the history
  • Loading branch information
danovaro committed Mar 8, 2024
1 parent ec5e557 commit b5c2a77
Show file tree
Hide file tree
Showing 4 changed files with 366 additions and 54 deletions.
2 changes: 2 additions & 0 deletions src/fdb5/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ list( APPEND fdb5_srcs
io/LustreFileHandle.h
io/HandleGatherer.cc
io/HandleGatherer.h
io/FieldHandle.cc
io/FieldHandle.h
rules/MatchAlways.cc
rules/MatchAlways.h
rules/MatchAny.cc
Expand Down
56 changes: 2 additions & 54 deletions src/fdb5/api/FDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "fdb5/api/helpers/FDBToolRequest.h"
#include "fdb5/database/Key.h"
#include "fdb5/io/HandleGatherer.h"
#include "fdb5/io/FieldHandle.h"
#include "fdb5/message/MessageDecoder.h"

namespace fdb5 {
Expand Down Expand Up @@ -140,13 +141,6 @@ bool FDB::sorted(const metkit::mars::MarsRequest &request) {
return sorted;
}

class ListElementDeduplicator : public metkit::hypercube::Deduplicator<ListElement> {
public:
bool toReplace(const ListElement& existing, const ListElement& replacement) const override {
return existing.timestamp() < replacement.timestamp();
}
};

eckit::DataHandle* FDB::read(const eckit::URI& uri) {
FieldLocation* loc = FieldLocationFactory::instance().build(uri.scheme(), uri);
return loc->dataHandle();
Expand All @@ -165,54 +159,8 @@ eckit::DataHandle* FDB::read(const std::vector<eckit::URI>& uris, bool sorted) {


eckit::DataHandle* FDB::read(ListIterator& it, bool sorted) {
eckit::Timer timer;
timer.start();

HandleGatherer result(sorted);
ListElement el;

static bool dedup = eckit::Resource<bool>("fdbDeduplicate;$FDB_DEDUPLICATE_FIELDS", false);
if (dedup) {
if (it.next(el)) {
// build the request representing the tensor-product of all retrieved fields
metkit::mars::MarsRequest cubeRequest = el.combinedKey().request();
std::vector<ListElement> elements{el};

while (it.next(el)) {
cubeRequest.merge(el.combinedKey().request());
elements.push_back(el);
}

// checking all retrieved fields against the hypercube, to remove duplicates
ListElementDeduplicator dedup;
metkit::hypercube::HyperCubePayloaded<ListElement> cube(cubeRequest, dedup);
for(auto el: elements) {
cube.add(el.combinedKey().request(), el);
}

if (cube.countVacant() > 0) {
std::stringstream ss;
ss << "No matching data for requests:" << std::endl;
for (auto req: cube.vacantRequests()) {
ss << " " << req << std::endl;
}
eckit::Log::warning() << ss.str() << std::endl;
}

for (size_t i=0; i< cube.size(); i++) {
ListElement element;
if (cube.find(i, element)) {
result.add(element.location().dataHandle());
}
}
}
}
else {
while (it.next(el)) {
result.add(el.location().dataHandle());
}
}
return result.dataHandle();
return new FieldHandle(it);
}

eckit::DataHandle* FDB::retrieve(const metkit::mars::MarsRequest& request) {
Expand Down
268 changes: 268 additions & 0 deletions src/fdb5/io/FieldHandle.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
/*
* (C) Copyright 1996- ECMWF.
*
* This software is licensed under the terms of the Apache Licence Version 2.0
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
* In applying this licence, ECMWF does not waive the privileges and immunities
* granted to it by virtue of its status as an intergovernmental organisation nor
* does it submit to any jurisdiction.
*/

#include <numeric>

#include "eckit/io/MemoryHandle.h"
#include "eckit/config/Resource.h"
#include "eckit/log/Timer.h"
#include "eckit/runtime/Metrics.h"
#include "eckit/types/Types.h"

#include "metkit/mars/MarsRequest.h"
#include "metkit/hypercube/HyperCubePayloaded.h"

#include "fdb5/io/FieldHandle.h"

namespace fdb5 {

//----------------------------------------------------------------------------------------------------------------------

class ListElementDeduplicator : public metkit::hypercube::Deduplicator<ListElement> {
public:
bool toReplace(const ListElement& existing, const ListElement& replacement) const override {
return existing.timestamp() < replacement.timestamp();
}
};

//----------------------------------------------------------------------------------------------------------------------

FieldHandle::FieldHandle(ListIterator& it) :
fields_({}), totalSize_(0), currentIdx_(0), current_(nullptr), currentMemoryHandle_(false), sorted_(false), seekable_(true) {
ListElement el;
eckit::Length largest = 0;

static bool dedup = eckit::Resource<bool>("fdbDeduplicate;$FDB_DEDUPLICATE_FIELDS", false);
if (dedup) {
if (it.next(el)) {
// build the request representing the tensor-product of all retrieved fields
metkit::mars::MarsRequest cubeRequest = el.combinedKey().request();
std::vector<ListElement> elements{el};

while (it.next(el)) {
cubeRequest.merge(el.combinedKey().request());
elements.push_back(el);
}

// checking all retrieved fields against the hypercube, to remove duplicates
ListElementDeduplicator dedup;
metkit::hypercube::HyperCubePayloaded<ListElement> cube(cubeRequest, dedup);
for(auto el: elements) {
cube.add(el.combinedKey().request(), el);
}

if (cube.countVacant() > 0) {
std::stringstream ss;
ss << "No matching data for requests:" << std::endl;
for (auto req: cube.vacantRequests()) {
ss << " " << req << std::endl;
}
eckit::Log::warning() << ss.str() << std::endl;
}

for (size_t i=0; i< cube.size(); i++) {
ListElement element;
if (cube.find(i, element)) {
fields_.push_back(std::make_pair(el.location().length(), el.location().dataHandle()));
eckit::Length len = el.location().length();
totalSize_ += len;
bool canSeek = el.location().dataHandle()->canSeek();
if (!canSeek) {
largest = std::max(largest, len);
seekable_ = false;
}
}
}
}
}
else {
while (it.next(el)) {
fields_.push_back(std::make_pair(el.location().length(), el.location().dataHandle()));

eckit::Length len = el.location().length();
totalSize_ += len;
bool canSeek = el.location().dataHandle()->canSeek();
if (!canSeek) {
largest = std::max(largest, len);
seekable_ = false;
}
}
}

if (!seekable_) {
// allocate a buffer that can fit the largest not seekable field (to avoid re-allocations)
buffer_ = eckit::Buffer(largest);
}
}

FieldHandle::~FieldHandle() {}

void FieldHandle::openCurrent() {

if (current_ && currentMemoryHandle_) {
delete current_;
currentMemoryHandle_ = false;
}

if (currentIdx_ < fields_.size()) {

eckit::Length currentSize = fields_[currentIdx_].first;
current_ = fields_[currentIdx_].second;
current_->openForRead();

if (!current_->canSeek()) {
current_->read(buffer_.data(), currentSize);
current_ = new eckit::MemoryHandle(buffer_.data(), currentSize);
current_->openForRead();
currentMemoryHandle_ = true;
}
}
}

eckit::Length FieldHandle::openForRead() {

current_=0;
openCurrent();

return totalSize_;
}

long FieldHandle::read1(char* buffer, long length) {
if (currentIdx_ >= fields_.size()) {
return 0;
}

long n = current_->read(buffer, length);
if (n <= 0) {
current_->close();
currentIdx_++;
openCurrent();
return read1(buffer, length);
}
return n;
}

long FieldHandle::read(void* buffer, long length) {
char* p = static_cast<char*>(buffer);
long n = 0;
long total = 0;

while (length > 0 && (n = read1(p, length)) > 0) {
length -= n;
total += n;
p += n;
}

eckit::Log::debug() << "FieldHandle::read " << (total > 0 ? total : n) << std::endl;

return total > 0 ? total : n;
}

void FieldHandle::close() {
if (currentIdx_ != fields_.size()) {
current_->close();
if (currentMemoryHandle_) {
delete current_;
}
}
currentIdx_ = fields_.size();
}

void FieldHandle::rewind() {
if (currentIdx_ == 0 || seekable_) {
if (current_ && currentIdx_ < fields_.size()) {
current_->close();
}
currentIdx_ = 0;
openCurrent();
} else {
throw eckit::ReadError("rewind not supported");
}
}

void FieldHandle::print(std::ostream& s) const {
if (eckit::format(s) == eckit::Log::compactFormat) {
s << "FieldHandle";
}
else {
s << "FieldHandle[";
for (size_t i = 0; i < fields_.size(); i++) {
if (i != 0) {
s << ",(";
}
fields_[i].second->print(s);
s << ")";
}
s << ']';
}
}

eckit::Length FieldHandle::size() {
return totalSize_;
}

eckit::Length FieldHandle::estimate() {
return totalSize_;
}

// only partial seek: within the current message and forward
bool FieldHandle::canSeek() const {
return true;
}

eckit::Offset FieldHandle::position() {
long long accumulated = 0;
for (size_t idx = 0; idx < currentIdx_; idx++) {
accumulated += fields_[idx].first;
}
return accumulated + (currentIdx_ >= fields_.size() ? eckit::Offset(0) : current_->position());
}

eckit::Offset FieldHandle::seek(const eckit::Offset& offset) {
if (current_ && currentIdx_ < fields_.size()) {
current_->close();
}

const long long seekto = offset;
long long accumulated = 0;

if (!seekable_) { // check that the offset is within the current message of later
for (size_t idx = 0; idx < currentIdx_; idx++) {
accumulated += fields_[idx].first;
}

if (seekto < accumulated) {
throw eckit::UserError("Cannot seek backward to previous data fields");
}
}

accumulated = 0;
for (currentIdx_ = 0; currentIdx_ < fields_.size(); ++currentIdx_) {
long long size = fields_[currentIdx_].first;
if (accumulated <= seekto && seekto < accumulated + size) {
openCurrent();
current_->seek(seekto - accumulated);
return offset;
}
accumulated += size;
}
// check if we went beyond EOF which is POSIX compliant, but we ASSERT so we find possible bugs
eckit::Offset beyond = seekto - accumulated;
ASSERT(not beyond);
return offset;
}

bool FieldHandle::compress(bool) {
return false;
}

//----------------------------------------------------------------------------------------------------------------------

} // namespace eckit
Loading

0 comments on commit b5c2a77

Please sign in to comment.