Skip to content

Commit

Permalink
Implemented two consistency check modes which can enabled with --chec…
Browse files Browse the repository at this point in the history
…k-md and --check-full.
  • Loading branch information
nicolau-manubens committed Nov 5, 2024
1 parent 1d93853 commit 556c89c
Showing 1 changed file with 237 additions and 9 deletions.
246 changes: 237 additions & 9 deletions src/fdb5/tools/fdb-hammer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@
#include "fdb5/tools/FDBTool.h"
#include "fdb5/api/helpers/FDBToolRequest.h"

#include <unistd.h>
#include <limits.h>
#include "eckit/log/TimeStamp.h"
#include "eckit/utils/MD5.h"
#include "eckit/message/Reader.h"
#include "eckit/message/Message.h"

// uncomment for daos runs
//#include "fdb5/daos/DaosSession.h"

Expand Down Expand Up @@ -67,7 +74,7 @@ class FDBHammer : public fdb5::FDBTool {

FDBHammer(int argc, char **argv) :
fdb5::FDBTool(argc, argv),
verbose_(false) {
verbose_(false), md_check_(false), full_check_(false) {

options_.push_back(new eckit::option::SimpleOption<std::string>("expver", "Reset expver on data"));
options_.push_back(new eckit::option::SimpleOption<std::string>("class", "Reset class on data"));
Expand All @@ -84,15 +91,31 @@ class FDBHammer : public fdb5::FDBTool {
// options_.push_back(new eckit::option::SimpleOption<long>("proc-id", "Identifier of the process running fdb-hammer"));
options_.push_back(new eckit::option::SimpleOption<bool>("verbose", "Print verbose output"));
options_.push_back(new eckit::option::SimpleOption<bool>("disable-subtocs", "Disable use of subtocs"));
options_.push_back(new eckit::option::SimpleOption<bool>("md-check",
"Calculate a metadata checksum (checksum of the field key + unique operation identifier) on every field write and "
"insert it at the begginning and end of the encoded message data payload. "
"On every field read, the checksums are extracted from the message data payload, the checksum of the FDB field key "
"is recalculated, and all checksums are checked to match."));
options_.push_back(new eckit::option::SimpleOption<bool>("full-check",
"Insert a metadata checksum (checksum of the field key + unique operation identifier) and a checksum of the full "
"field data payload, on every field write, at the begginning of the encoded message data payload. The data checksum "
"includes the unique operation identifier in its calculation, and the resulting checksum is written at the beginning "
"of the encoded message data payload between the field key checksum and the operation identifier. "
"On every field read, the key checksum is extracted from the message data payload, the key checksum is recalculated "
"from FDB field key, and both checksums are compared. The checksum of the data payload is also extracted and "
"recalculated from the message data payload (excluding the key checksum but including the operation identifier), "
"and both checksums are checked to match."));
}
~FDBHammer() override {}

private:
bool verbose_;
bool md_check_;
bool full_check_;
};

void FDBHammer::usage(const std::string &tool) const {
eckit::Log::info() << std::endl << "Usage: " << tool << " [--statistics] [--read] [--list] --nsteps=<nsteps> --nensembles=<nensembles> --nlevels=<nlevels> --nparams=<nparams> --expver=<expver> <grib_path>" << std::endl;
eckit::Log::info() << std::endl << "Usage: " << tool << " [--statistics] [--read] [--list] [--md-check|--full-check] --nsteps=<nsteps> --nensembles=<nensembles> --nlevels=<nlevels> --nparams=<nparams> --expver=<expver> <grib_path>" << std::endl;
fdb5::FDBTool::usage(tool);
}

Expand All @@ -107,6 +130,10 @@ void FDBHammer::init(const eckit::option::CmdArgs& args)
ASSERT(args.has("nparams"));

verbose_ = args.getBool("verbose", false);

md_check_ = args.getBool("md-check", false);
full_check_ = args.getBool("full-check", false);
if (full_check_) md_check_ = false;
}

void FDBHammer::execute(const eckit::option::CmdArgs &args) {
Expand Down Expand Up @@ -153,6 +180,8 @@ void FDBHammer::executeWrite(const eckit::option::CmdArgs &args) {
size = cls.length();
CODES_CHECK(codes_set_string(handle, "class", cls.c_str(), &size), 0);

eckit::Translator<size_t,std::string> str;

struct timeval tval_before_io, tval_after_io;
eckit::Timer timer;
eckit::Timer gribTimer;
Expand Down Expand Up @@ -185,6 +214,87 @@ void FDBHammer::executeWrite(const eckit::option::CmdArgs &args) {

CODES_CHECK(codes_get_message(handle, reinterpret_cast<const void**>(&buffer), &size), 0);

if (full_check_ or md_check_) {

// get message data offset
long offsetBeforeData = 0, offsetAfterData = 0;
CODES_CHECK(codes_get_long(handle, std::string("offsetBeforeData").c_str(), &offsetBeforeData), 0);
CODES_CHECK(codes_get_long(handle, std::string("offsetAfterData").c_str(), &offsetAfterData), 0);

// generate a checksum of the FDB key
fdb5::Key key({
{"number", str(member+number-1)},
{"step", str(step)},
{"level", str(lev+level-1)},
{"param", str(real_param)},
});
std::string key_string(key);
eckit::MD5 md5(key_string);
std::string digest = md5.digest();
uint64_t key_hi = std::stoull(digest.substr(0, 8), nullptr, 16);
uint64_t key_lo = std::stoull(digest.substr(8, 16), nullptr, 16);

// generate a unique write operation ID and calculate its checksum
/// @note: copied from LocalPathName::unique. Ditched StaticMutex - not thread safe
std::string hostname = eckit::Main::hostname();
static unsigned long long n = (((unsigned long long)::getpid()) << 32);
static std::string format = "%Y%m%d.%H%M%S";
std::ostringstream os;
os << eckit::TimeStamp(format) << '.' << hostname << '.' << n++;
std::string uid = os.str();
while (::access(uid.c_str(), F_OK) == 0) {
std::ostringstream os;
os << eckit::TimeStamp(format) << '.' << hostname << '.' << n++;
uid = os.str();
}
md5.reset();
md5.add(uid);
digest = md5.digest();
uint64_t uid_hi = std::stoull(digest.substr(0, 8), nullptr, 16);
uint64_t uid_lo = std::stoull(digest.substr(8, 16), nullptr, 16);

if (md_check_) {

// write checksums in message data region
// message = grib header | fdb key checksum | unique id checksum | grib data | fdb key checksum | unique id checksum
::memcpy(&const_cast<char*>(buffer)[offsetBeforeData], &key_hi, sizeof(key_hi));
::memcpy(&const_cast<char*>(buffer)[offsetBeforeData + sizeof(key_hi)], &key_lo, sizeof(key_lo));
::memcpy(&const_cast<char*>(buffer)[offsetBeforeData + 2 * sizeof(key_lo)], &uid_hi, sizeof(uid_hi));
::memcpy(&const_cast<char*>(buffer)[offsetBeforeData + 3 * sizeof(key_lo)], &uid_lo, sizeof(uid_lo));
::memcpy(&const_cast<char*>(buffer)[offsetAfterData - 4 * sizeof(key_lo)], &key_hi, sizeof(key_hi));
::memcpy(&const_cast<char*>(buffer)[offsetAfterData - 3 * sizeof(key_lo)], &key_lo, sizeof(key_lo));
::memcpy(&const_cast<char*>(buffer)[offsetAfterData - 2 * sizeof(key_lo)], &uid_hi, sizeof(uid_hi));
::memcpy(&const_cast<char*>(buffer)[offsetAfterData - 1 * sizeof(key_lo)], &uid_lo, sizeof(uid_lo));

}

if (full_check_) {

// checksums will be laid out as follows:
// message = grib header | fdb key checksum | data checksum | unique id checksum | grib data

long uidChecksumOffset = offsetBeforeData + 4 * sizeof(key_lo);

// write operation uid checksum
::memcpy(&const_cast<char*>(buffer)[uidChecksumOffset], &uid_hi, sizeof(uid_hi));
::memcpy(&const_cast<char*>(buffer)[uidChecksumOffset + sizeof(key_lo)], &uid_lo, sizeof(uid_lo));

// calculate cheksum of data excluding the key and data checksum ranges
md5.reset();
md5.add(buffer + uidChecksumOffset, offsetAfterData - uidChecksumOffset);
digest = md5.digest();
uint64_t data_hi = std::stoull(digest.substr(0, 8), nullptr, 16);
uint64_t data_lo = std::stoull(digest.substr(8, 16), nullptr, 16);

// write checksums in message data region
::memcpy(&const_cast<char*>(buffer)[offsetBeforeData], &key_hi, sizeof(key_hi));
::memcpy(&const_cast<char*>(buffer)[offsetBeforeData + sizeof(key_hi)], &key_lo, sizeof(key_lo));
::memcpy(&const_cast<char*>(buffer)[offsetBeforeData + 2 * sizeof(key_lo)], &data_hi, sizeof(data_hi));
::memcpy(&const_cast<char*>(buffer)[offsetBeforeData + 3 * sizeof(key_lo)], &data_lo, sizeof(data_lo));

}
}

gribTimer.stop();
elapsed_grib += gribTimer.elapsed();

Expand Down Expand Up @@ -312,27 +422,145 @@ void FDBHammer::executeRead(const eckit::option::CmdArgs &args) {

std::unique_ptr<eckit::DataHandle> dh(handles.dataHandle());

EmptyHandle nullOutputHandle;
size_t total = dh->copyTo(nullOutputHandle);
eckit::Optional<eckit::Buffer> buff;
eckit::Optional<eckit::MemoryHandle> mh;
eckit::Optional<eckit::Length> original_size;

size_t total = 0;

if (full_check_ || md_check_) {
// if storing all read data in memory for later checksum calculation and verification
// is not an option, it could be stored and processed by parts as follows:
//
// struct Hack {
// bool sorted_;
// std::vector<eckit::DataHandle *> handles_;
// size_t count_;
// }
// Hack* dh2 = reinterpret_cast<Hack *>(dh.get());
// for (auto& ph : dh2->datahandles_) { ... }

eckit::FileHandle fin(args(0));
original_size.emplace(fin.size());
size_t expected = nensembles * nsteps * nlevels * nparams * (size_t)original_size.value();
buff.emplace(expected);
mh.emplace(buff.value(), expected);
total = dh->saveInto(mh.value());
} else {
EmptyHandle nullOutputHandle;
total = dh->saveInto(nullOutputHandle);
}

gettimeofday(&tval_after_io, NULL);

// uncomment for lustre runs
//st.stop();

timer.stop();

if (full_check_ || md_check_) {

eckit::Translator<size_t,std::string> str;

eckit::message::Reader reader(mh.value());
eckit::message::Message msg;

for (size_t member = 1; member <= nensembles; ++member) {
for (size_t step = 0; step < nsteps; ++step) {
for (size_t lev = 1; lev <= nlevels; ++lev) {
for (size_t param = 1, real_param = 1; param <= nparams; ++param, ++real_param) {
// GRIB API only allows us to use certain parameters
while (AWKWARD_PARAMS.find(real_param) != AWKWARD_PARAMS.end()) {
real_param++;
}

if (!(msg = reader.next())) throw eckit::Exception("Found less fields than expected.");

if (eckit::Length(msg.length()) != original_size.value()) throw eckit::Exception("Found a field of different size than the seed.");

// TODO: get full grib key to ensure grib header is not corrupted
//fdb5::Key keycheck;
//fdb5::KeySetter setter(keycheck);
//msg.getMetadata(setter);

// get message data offset
long offsetBeforeData = msg.getLong("offsetBeforeData");
long offsetAfterData = msg.getLong("offsetAfterData");

// generate a checksum of the FDB key
fdb5::Key key({
{"number", str(member+number-1)},
{"step", str(step)},
{"level", str(lev+level-1)},
{"param", str(real_param)},
});
std::string key_string(key);
eckit::MD5 md5(key_string);
std::string digest = md5.digest();
uint64_t key_hi = std::stoull(digest.substr(0, 8), nullptr, 16);
uint64_t key_lo = std::stoull(digest.substr(8, 16), nullptr, 16);

if (md_check_) {

long key_hi1 = 0, key_lo1 = 0, key_hi2 = 0, key_lo2 = 0;
long uid_hi1 = 0, uid_lo1 = 0, uid_hi2 = 0, uid_lo2 = 0;
::memcpy(&key_hi1, &((char*)(msg.data()))[offsetBeforeData], sizeof(key_hi));
::memcpy(&key_lo1, &((char*)(msg.data()))[offsetBeforeData + sizeof(key_hi)], sizeof(key_hi));
::memcpy(&uid_hi1, &((char*)(msg.data()))[offsetBeforeData + 2 * sizeof(key_hi)], sizeof(key_hi));
::memcpy(&uid_lo1, &((char*)(msg.data()))[offsetBeforeData + 3 * sizeof(key_hi)], sizeof(key_hi));
::memcpy(&key_hi2, &((char*)(msg.data()))[offsetAfterData - 4 * sizeof(key_lo)], sizeof(key_hi));
::memcpy(&key_lo2, &((char*)(msg.data()))[offsetAfterData - 3 * sizeof(key_lo)], sizeof(key_hi));
::memcpy(&uid_hi2, &((char*)(msg.data()))[offsetAfterData - 2 * sizeof(key_lo)], sizeof(key_hi));
::memcpy(&uid_lo2, &((char*)(msg.data()))[offsetAfterData - 1 * sizeof(key_lo)], sizeof(key_hi));

ASSERT(key_hi == key_hi1 && key_hi == key_hi2);
ASSERT(key_lo == key_lo1 && key_lo == key_lo2);
ASSERT(uid_hi1 == uid_hi2);
ASSERT(uid_lo1 == uid_lo2);

}

if (full_check_) {

// calculate cheksum of data excluding the key and data checksum ranges
long uidChecksumOffset = offsetBeforeData + 4 * sizeof(key_lo);
eckit::MD5 md5(&((char*)(msg.data()))[uidChecksumOffset], offsetAfterData - uidChecksumOffset);
digest = md5.digest();
uint64_t data_hi = std::stoull(digest.substr(0, 8), nullptr, 16);
uint64_t data_lo = std::stoull(digest.substr(8, 16), nullptr, 16);

long key_hi1 = 0, key_lo1 = 0, data_hi1 = 0, data_lo1 = 0;
::memcpy(&key_hi1, &((char*)(msg.data()))[offsetBeforeData], sizeof(key_hi));
::memcpy(&key_lo1, &((char*)(msg.data()))[offsetBeforeData + sizeof(key_hi)], sizeof(key_hi));
::memcpy(&data_hi1, &((char*)(msg.data()))[offsetBeforeData + 2 * sizeof(key_hi)], sizeof(key_hi));
::memcpy(&data_lo1, &((char*)(msg.data()))[offsetBeforeData + 3 * sizeof(key_hi)], sizeof(key_hi));

ASSERT(key_hi == key_hi1);
ASSERT(key_lo == key_lo1);
ASSERT(data_hi == data_hi1);
ASSERT(data_lo == data_lo1);

}

}
}
}
}

}

// uncomment for daos runs
//fdb5::DaosManager::instance().stats().report(std::cout);
// uncomment for rados runs
eckit::RadosCluster::instance().stats().report(std::cout);
// uncomment for lustre runs
//fdb5::TocManager::instance().stats().report(std::cout);

Log::info() << "fdb-hammer - Fields read: " << fieldsRead << std::endl;
Log::info() << "fdb-hammer - Bytes read: " << total << std::endl;
Log::info() << "fdb-hammer - Total duration: " << timer.elapsed() << std::endl;
// Log::info() << "fdb-hammer - Total rate: " << double(total) / timer.elapsed() << " bytes / s" << std::endl;
// Log::info() << "fdb-hammer - Total rate: " << double(total) / (timer.elapsed() * 1024 * 1024) << " MB / s" << std::endl;
Log::info() << "Fields read: " << fieldsRead << std::endl;
Log::info() << "Bytes read: " << total << std::endl;
Log::info() << "Total duration: " << timer.elapsed() << std::endl;
//Log::info() << "Total rate: " << double(total) / timer.elapsed() << " bytes / s" << std::endl;
//Log::info() << "Total rate: " << double(total) / (timer.elapsed() * 1024 * 1024) << " MB / s" << std::endl;

Log::info() << "Timestamp before first IO: " <<
(long int)tval_before_io.tv_sec << "." <<
Expand Down

0 comments on commit 556c89c

Please sign in to comment.