Skip to content

Commit

Permalink
Merge pull request #33 from ecmwf/feature/optimise-ranges
Browse files Browse the repository at this point in the history
Rename mc::range to mc::block, better bucket scaling
  • Loading branch information
ChrisspyB authored Nov 22, 2024
2 parents 7214a80 + 55fda3f commit ded3024
Show file tree
Hide file tree
Showing 15 changed files with 237 additions and 131 deletions.
157 changes: 108 additions & 49 deletions src/gribjump/ExtractionData.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,117 @@ namespace gribjump {

namespace {

void encodeVector(eckit::Stream& s, const std::vector<double>& v) {
template <typename T>
void encodeVector(eckit::Stream& s, const std::vector<T>& v) {
size_t size = v.size();
s << size;
eckit::Buffer buffer(v.data(), size * sizeof(double));
eckit::Buffer buffer(v.data(), size * sizeof(T));
s << buffer;
}

std::vector<double> decodeVector(eckit::Stream& s) {
template <typename T>
std::vector<T> decodeVector(eckit::Stream& s) {
size_t size;
s >> size;
eckit::Buffer buffer(size * sizeof(double));
eckit::Buffer buffer(size * sizeof(T));
s >> buffer;
double* data = (double*) buffer.data();
T* data = (T*) buffer.data();

return std::vector<double>(data, data + size);
return std::vector<T>(data, data + size);
}

template <typename T>
void encodeVectorVector(eckit::Stream& s, const std::vector<std::vector<T>>& vv) {
std::vector<size_t> sizes;
sizes.reserve(vv.size());
size_t totalSize = 0;
for (auto& v : vv) {
sizes.push_back(v.size());
totalSize += v.size();
}
encodeVector(s, sizes);

// Make a flat vector
std::vector<T> flat;
flat.reserve(totalSize);
for (auto& v : vv) {
flat.insert(flat.end(), v.begin(), v.end());
}
encodeVector(s, flat);
}

template <typename T>
std::vector<std::vector<T>> decodeVectorVector(eckit::Stream& s) {
std::vector<size_t> sizes = decodeVector<size_t>(s);
std::vector<T> flat = decodeVector<T>(s);

std::vector<std::vector<T>> vv;
size_t pos = 0;
for (auto& size : sizes) {
vv.push_back(std::vector<T>(flat.begin() + pos, flat.begin() + pos + size));
pos += size;
}
return vv;
}

void encodeRanges(eckit::Stream& s, const std::vector<Range>& ranges) {
size_t size = ranges.size();
s << size;
eckit::Buffer buffer(ranges.data(), size * sizeof(size_t)*2); // does this really work?
s << buffer;
}

std::vector<Range> decodeRanges(eckit::Stream& s) {
size_t size;
s >> size;
eckit::Buffer buffer(size * sizeof(size_t)*2);
s >> buffer;
size_t* data = (size_t*) buffer.data();

std::vector<Range> ranges;
for (size_t i = 0; i < size; i++) {
ranges.push_back(std::make_pair(data[i*2], data[i*2+1]));
}

return ranges;
}

void encodeMask(eckit::Stream& s, const std::vector<std::vector<std::bitset<64>>>& mask) {

size_t totalSize = 0;
std::vector<size_t> sizes;
for (auto& v : mask) {
totalSize += v.size();
sizes.push_back(v.size());
}
std::vector<uint64_t> flat;
flat.reserve(totalSize);
for (auto& v : mask) {
for (auto& b : v) {
flat.push_back(b.to_ullong());
}
}
encodeVector(s, sizes);
encodeVector(s, flat);
}

std::vector<std::vector<std::bitset<64>>> decodeMask(eckit::Stream& s) {

std::vector<size_t> sizes = decodeVector<size_t>(s);
std::vector<uint64_t> flat = decodeVector<uint64_t>(s);

std::vector<std::vector<std::bitset<64>>> mask;
size_t pos = 0;
for (auto& size : sizes) {
std::vector<std::bitset<64>> maskBitset;
for (size_t i = 0; i < size; i++) {
maskBitset.push_back(std::bitset<64>(flat[pos + i]));
}
mask.push_back(maskBitset);
pos += size;
}
return mask;
}
} // namespace

ExtractionResult::ExtractionResult() {}
Expand All @@ -45,21 +139,13 @@ ExtractionResult::ExtractionResult(std::vector<std::vector<double>> values, std:
{}

ExtractionResult::ExtractionResult(eckit::Stream& s) {
size_t numRanges;
s >> numRanges;
for (size_t i = 0; i < numRanges; i++) {
values_.push_back(decodeVector(s));
}
values_ = decodeVectorVector<double>(s);
mask_ = decodeMask(s);
}

std::vector<std::vector<std::string>> bitsetStrings;
s >> bitsetStrings;
for (auto& v : bitsetStrings) {
std::vector<std::bitset<64>> bitset;
for (auto& b : v) {
bitset.push_back(std::bitset<64>(b));
}
mask_.push_back(bitset);
}
void ExtractionResult::encode(eckit::Stream& s) const {
encodeVectorVector(s, values_);
encodeMask(s, mask_);
}

void ExtractionResult::values_ptr(double*** values, unsigned long* nrange, unsigned long** nvalues) {
Expand All @@ -72,24 +158,6 @@ void ExtractionResult::values_ptr(double*** values, unsigned long* nrange, unsig
}
}

void ExtractionResult::encode(eckit::Stream& s) const {

s << values_.size(); // vector of vectors
for (auto& v : values_) {
encodeVector(s, v);
}

std::vector<std::vector<std::string>> bitsetStrings;
for (auto& v : mask_) {
std::vector<std::string> bitsetString;
for (auto& b : v) {
bitsetString.push_back(b.to_string());
}
bitsetStrings.push_back(bitsetString);
}
s << bitsetStrings;
}

void ExtractionResult::print(std::ostream& s) const {
s << "ExtractionResult[Values:[";
for (auto& v : values_) {
Expand Down Expand Up @@ -129,13 +197,7 @@ ExtractionRequest::ExtractionRequest() {}
ExtractionRequest::ExtractionRequest(eckit::Stream& s) {
s >> request_;
s >> gridHash_;
size_t numRanges;
s >> numRanges;
for (size_t j = 0; j < numRanges; j++) {
size_t start, end;
s >> start >> end;
ranges_.push_back(std::make_pair(start, end));
}
ranges_ = decodeRanges(s);
}

eckit::Stream& operator<<(eckit::Stream& s, const ExtractionRequest& o) {
Expand All @@ -146,10 +208,7 @@ eckit::Stream& operator<<(eckit::Stream& s, const ExtractionRequest& o) {
void ExtractionRequest::encode(eckit::Stream& s) const {
s << request_;
s << gridHash_;
s << ranges_.size();
for (auto& [start, end] : ranges_) {
s << start << end;
}
encodeRanges(s, ranges_);
}

void ExtractionRequest::print(std::ostream& s) const {
Expand Down
6 changes: 3 additions & 3 deletions src/gribjump/GribJumpDataAccessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ namespace gribjump {
class GribJumpDataAccessor : public mc::DataAccessor {

public:
GribJumpDataAccessor(eckit::DataHandle& dh, const mc::Range range) : dh_{dh}, data_section_range_{range} {}
GribJumpDataAccessor(eckit::DataHandle& dh, const mc::Block range) : dh_{dh}, data_section_range_{range} {}

void write(const eckit::Buffer& buffer, const size_t offset) const override {
NOTIMP;
}

eckit::Buffer read(const mc::Range& range) const override {
eckit::Buffer read(const mc::Block& range) const override {
eckit::Offset offset = range.first;
eckit::Length size = range.second;

Expand All @@ -51,7 +51,7 @@ class GribJumpDataAccessor : public mc::DataAccessor {

private:
eckit::DataHandle& dh_;
mc::Range data_section_range_;
mc::Block data_section_range_;
};

} // namespace gribjump
6 changes: 3 additions & 3 deletions src/gribjump/compression/DataAccessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class DataAccessor {
public:
virtual ~DataAccessor() = default;
virtual void write(const eckit::Buffer& buffer, const size_t offset) const = 0;
virtual eckit::Buffer read(const Range& range) const = 0;
virtual eckit::Buffer read(const Block& range) const = 0;
virtual eckit::Buffer read() const = 0;
virtual size_t eof() const = 0;
};
Expand All @@ -43,7 +43,7 @@ class PosixAccessor : public DataAccessor {
NOTIMP;
}

eckit::Buffer read(const Range& range) const override {
eckit::Buffer read(const Block& range) const override {
const auto [offset, size] = range;
eckit::Buffer buf(size);
ifs_.seekg(offset, std::ios::beg);
Expand Down Expand Up @@ -89,7 +89,7 @@ class MemoryAccessor : public DataAccessor {
NOTIMP;
}

eckit::Buffer read(const Range& range) const override {
eckit::Buffer read(const Block& range) const override {
const auto [offset, size] = range;
if (offset + size > buf_.size()) {
std::stringstream ss;
Expand Down
10 changes: 5 additions & 5 deletions src/gribjump/compression/NumericCompressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ class NumericDecompressor {
using CompressedData = eckit::Buffer;
using Values = std::vector<ValueType>;
virtual Values decode(const CompressedData&) = 0;
virtual Values decode(const std::shared_ptr<DataAccessor>, const Range&) = 0;
virtual Values decode(const std::shared_ptr<DataAccessor>, const Block&) = 0;


virtual std::vector<Values> decode(const std::shared_ptr<DataAccessor>& accessor, const std::vector<mc::Range>& ranges) {
virtual std::vector<Values> decode(const std::shared_ptr<DataAccessor>& accessor, const std::vector<mc::Block>& ranges) {
using Values = typename NumericDecompressor<ValueType>::Values;
std::vector<Values> result;
decode(accessor, ranges, result);
return result;
}

virtual void decode(const std::shared_ptr<DataAccessor>& accessor, const std::vector<mc::Range>& ranges, std::vector<Values>& result) {
virtual void decode(const std::shared_ptr<DataAccessor>& accessor, const std::vector<mc::Block>& ranges, std::vector<Values>& result) {
using Values = typename NumericDecompressor<ValueType>::Values;

std::unordered_map<Range, std::pair<Range, std::shared_ptr<Values>>> ranges_map;
std::unordered_map<Block, std::pair<Block, std::shared_ptr<Values>>> ranges_map;

// find which sub_ranges are in which buckets
RangeBuckets buckets;
BlockBuckets buckets;
for (const auto& range : ranges) {
buckets << range;
}
Expand Down
Loading

0 comments on commit ded3024

Please sign in to comment.