diff --git a/src/fdb5/tools/fdb-hammer.cc b/src/fdb5/tools/fdb-hammer.cc index f299a7956..31b05cadd 100644 --- a/src/fdb5/tools/fdb-hammer.cc +++ b/src/fdb5/tools/fdb-hammer.cc @@ -32,6 +32,13 @@ #include "fdb5/tools/FDBTool.h" #include "fdb5/api/helpers/FDBToolRequest.h" +#include +#include +#include "eckit/log/TimeStamp.h" +#include "eckit/utils/MD5.h" +#include "eckit/message/Reader.h" +#include "eckit/message/Message.h" + // uncomment for daos runs //#include "fdb5/daos/DaosSession.h" @@ -67,7 +74,7 @@ class FDBHammer : public fdb5::FDBTool { FDBHammer(int argc, char **argv) : fdb5::FDBTool(argc, argv), - verbose_(false) { + verbose_(false), md_check_(false), full_check_(false) { options_.push_back(new eckit::option::SimpleOption("expver", "Reset expver on data")); options_.push_back(new eckit::option::SimpleOption("class", "Reset class on data")); @@ -84,15 +91,31 @@ class FDBHammer : public fdb5::FDBTool { // options_.push_back(new eckit::option::SimpleOption("proc-id", "Identifier of the process running fdb-hammer")); options_.push_back(new eckit::option::SimpleOption("verbose", "Print verbose output")); options_.push_back(new eckit::option::SimpleOption("disable-subtocs", "Disable use of subtocs")); + options_.push_back(new eckit::option::SimpleOption("md-check", + "Calculate a metadata checksum (checksum of the field key + unique operation identifier) on every field write and " + "insert it at the begginning and end of the encoded message data payload. " + "On every field read, the checksums are extracted from the message data payload, the checksum of the FDB field key " + "is recalculated, and all checksums are checked to match.")); + options_.push_back(new eckit::option::SimpleOption("full-check", + "Insert a metadata checksum (checksum of the field key + unique operation identifier) and a checksum of the full " + "field data payload, on every field write, at the begginning of the encoded message data payload. The data checksum " + "includes the unique operation identifier in its calculation, and the resulting checksum is written at the beginning " + "of the encoded message data payload between the field key checksum and the operation identifier. " + "On every field read, the key checksum is extracted from the message data payload, the key checksum is recalculated " + "from FDB field key, and both checksums are compared. The checksum of the data payload is also extracted and " + "recalculated from the message data payload (excluding the key checksum but including the operation identifier), " + "and both checksums are checked to match.")); } ~FDBHammer() override {} private: bool verbose_; + bool md_check_; + bool full_check_; }; void FDBHammer::usage(const std::string &tool) const { - eckit::Log::info() << std::endl << "Usage: " << tool << " [--statistics] [--read] [--list] --nsteps= --nensembles= --nlevels= --nparams= --expver= " << std::endl; + eckit::Log::info() << std::endl << "Usage: " << tool << " [--statistics] [--read] [--list] [--md-check|--full-check] --nsteps= --nensembles= --nlevels= --nparams= --expver= " << std::endl; fdb5::FDBTool::usage(tool); } @@ -107,6 +130,10 @@ void FDBHammer::init(const eckit::option::CmdArgs& args) ASSERT(args.has("nparams")); verbose_ = args.getBool("verbose", false); + + md_check_ = args.getBool("md-check", false); + full_check_ = args.getBool("full-check", false); + if (full_check_) md_check_ = false; } void FDBHammer::execute(const eckit::option::CmdArgs &args) { @@ -153,6 +180,8 @@ void FDBHammer::executeWrite(const eckit::option::CmdArgs &args) { size = cls.length(); CODES_CHECK(codes_set_string(handle, "class", cls.c_str(), &size), 0); + eckit::Translator str; + struct timeval tval_before_io, tval_after_io; eckit::Timer timer; eckit::Timer gribTimer; @@ -185,6 +214,87 @@ void FDBHammer::executeWrite(const eckit::option::CmdArgs &args) { CODES_CHECK(codes_get_message(handle, reinterpret_cast(&buffer), &size), 0); + if (full_check_ or md_check_) { + + // get message data offset + long offsetBeforeData = 0, offsetAfterData = 0; + CODES_CHECK(codes_get_long(handle, std::string("offsetBeforeData").c_str(), &offsetBeforeData), 0); + CODES_CHECK(codes_get_long(handle, std::string("offsetAfterData").c_str(), &offsetAfterData), 0); + + // generate a checksum of the FDB key + fdb5::Key key({ + {"number", str(member+number-1)}, + {"step", str(step)}, + {"level", str(lev+level-1)}, + {"param", str(real_param)}, + }); + std::string key_string(key); + eckit::MD5 md5(key_string); + std::string digest = md5.digest(); + uint64_t key_hi = std::stoull(digest.substr(0, 8), nullptr, 16); + uint64_t key_lo = std::stoull(digest.substr(8, 16), nullptr, 16); + + // generate a unique write operation ID and calculate its checksum + /// @note: copied from LocalPathName::unique. Ditched StaticMutex - not thread safe + std::string hostname = eckit::Main::hostname(); + static unsigned long long n = (((unsigned long long)::getpid()) << 32); + static std::string format = "%Y%m%d.%H%M%S"; + std::ostringstream os; + os << eckit::TimeStamp(format) << '.' << hostname << '.' << n++; + std::string uid = os.str(); + while (::access(uid.c_str(), F_OK) == 0) { + std::ostringstream os; + os << eckit::TimeStamp(format) << '.' << hostname << '.' << n++; + uid = os.str(); + } + md5.reset(); + md5.add(uid); + digest = md5.digest(); + uint64_t uid_hi = std::stoull(digest.substr(0, 8), nullptr, 16); + uint64_t uid_lo = std::stoull(digest.substr(8, 16), nullptr, 16); + + if (md_check_) { + + // write checksums in message data region + // message = grib header | fdb key checksum | unique id checksum | grib data | fdb key checksum | unique id checksum + ::memcpy(&const_cast(buffer)[offsetBeforeData], &key_hi, sizeof(key_hi)); + ::memcpy(&const_cast(buffer)[offsetBeforeData + sizeof(key_hi)], &key_lo, sizeof(key_lo)); + ::memcpy(&const_cast(buffer)[offsetBeforeData + 2 * sizeof(key_lo)], &uid_hi, sizeof(uid_hi)); + ::memcpy(&const_cast(buffer)[offsetBeforeData + 3 * sizeof(key_lo)], &uid_lo, sizeof(uid_lo)); + ::memcpy(&const_cast(buffer)[offsetAfterData - 4 * sizeof(key_lo)], &key_hi, sizeof(key_hi)); + ::memcpy(&const_cast(buffer)[offsetAfterData - 3 * sizeof(key_lo)], &key_lo, sizeof(key_lo)); + ::memcpy(&const_cast(buffer)[offsetAfterData - 2 * sizeof(key_lo)], &uid_hi, sizeof(uid_hi)); + ::memcpy(&const_cast(buffer)[offsetAfterData - 1 * sizeof(key_lo)], &uid_lo, sizeof(uid_lo)); + + } + + if (full_check_) { + + // checksums will be laid out as follows: + // message = grib header | fdb key checksum | data checksum | unique id checksum | grib data + + long uidChecksumOffset = offsetBeforeData + 4 * sizeof(key_lo); + + // write operation uid checksum + ::memcpy(&const_cast(buffer)[uidChecksumOffset], &uid_hi, sizeof(uid_hi)); + ::memcpy(&const_cast(buffer)[uidChecksumOffset + sizeof(key_lo)], &uid_lo, sizeof(uid_lo)); + + // calculate cheksum of data excluding the key and data checksum ranges + md5.reset(); + md5.add(buffer + uidChecksumOffset, offsetAfterData - uidChecksumOffset); + digest = md5.digest(); + uint64_t data_hi = std::stoull(digest.substr(0, 8), nullptr, 16); + uint64_t data_lo = std::stoull(digest.substr(8, 16), nullptr, 16); + + // write checksums in message data region + ::memcpy(&const_cast(buffer)[offsetBeforeData], &key_hi, sizeof(key_hi)); + ::memcpy(&const_cast(buffer)[offsetBeforeData + sizeof(key_hi)], &key_lo, sizeof(key_lo)); + ::memcpy(&const_cast(buffer)[offsetBeforeData + 2 * sizeof(key_lo)], &data_hi, sizeof(data_hi)); + ::memcpy(&const_cast(buffer)[offsetBeforeData + 3 * sizeof(key_lo)], &data_lo, sizeof(data_lo)); + + } + } + gribTimer.stop(); elapsed_grib += gribTimer.elapsed(); @@ -312,8 +422,35 @@ void FDBHammer::executeRead(const eckit::option::CmdArgs &args) { std::unique_ptr dh(handles.dataHandle()); - EmptyHandle nullOutputHandle; - size_t total = dh->copyTo(nullOutputHandle); + eckit::Optional buff; + eckit::Optional mh; + eckit::Optional original_size; + + size_t total = 0; + + if (full_check_ || md_check_) { + // if storing all read data in memory for later checksum calculation and verification + // is not an option, it could be stored and processed by parts as follows: + // + // struct Hack { + // bool sorted_; + // std::vector handles_; + // size_t count_; + // } + // Hack* dh2 = reinterpret_cast(dh.get()); + // for (auto& ph : dh2->datahandles_) { ... } + + eckit::FileHandle fin(args(0)); + original_size.emplace(fin.size()); + size_t expected = nensembles * nsteps * nlevels * nparams * (size_t)original_size.value(); + buff.emplace(expected); + mh.emplace(buff.value(), expected); + total = dh->saveInto(mh.value()); + } else { + EmptyHandle nullOutputHandle; + total = dh->saveInto(nullOutputHandle); + } + gettimeofday(&tval_after_io, NULL); // uncomment for lustre runs @@ -321,6 +458,97 @@ void FDBHammer::executeRead(const eckit::option::CmdArgs &args) { timer.stop(); + if (full_check_ || md_check_) { + + eckit::Translator str; + + eckit::message::Reader reader(mh.value()); + eckit::message::Message msg; + + for (size_t member = 1; member <= nensembles; ++member) { + for (size_t step = 0; step < nsteps; ++step) { + for (size_t lev = 1; lev <= nlevels; ++lev) { + for (size_t param = 1, real_param = 1; param <= nparams; ++param, ++real_param) { + // GRIB API only allows us to use certain parameters + while (AWKWARD_PARAMS.find(real_param) != AWKWARD_PARAMS.end()) { + real_param++; + } + + if (!(msg = reader.next())) throw eckit::Exception("Found less fields than expected."); + + if (eckit::Length(msg.length()) != original_size.value()) throw eckit::Exception("Found a field of different size than the seed."); + + // TODO: get full grib key to ensure grib header is not corrupted + //fdb5::Key keycheck; + //fdb5::KeySetter setter(keycheck); + //msg.getMetadata(setter); + + // get message data offset + long offsetBeforeData = msg.getLong("offsetBeforeData"); + long offsetAfterData = msg.getLong("offsetAfterData"); + + // generate a checksum of the FDB key + fdb5::Key key({ + {"number", str(member+number-1)}, + {"step", str(step)}, + {"level", str(lev+level-1)}, + {"param", str(real_param)}, + }); + std::string key_string(key); + eckit::MD5 md5(key_string); + std::string digest = md5.digest(); + uint64_t key_hi = std::stoull(digest.substr(0, 8), nullptr, 16); + uint64_t key_lo = std::stoull(digest.substr(8, 16), nullptr, 16); + + if (md_check_) { + + long key_hi1 = 0, key_lo1 = 0, key_hi2 = 0, key_lo2 = 0; + long uid_hi1 = 0, uid_lo1 = 0, uid_hi2 = 0, uid_lo2 = 0; + ::memcpy(&key_hi1, &((char*)(msg.data()))[offsetBeforeData], sizeof(key_hi)); + ::memcpy(&key_lo1, &((char*)(msg.data()))[offsetBeforeData + sizeof(key_hi)], sizeof(key_hi)); + ::memcpy(&uid_hi1, &((char*)(msg.data()))[offsetBeforeData + 2 * sizeof(key_hi)], sizeof(key_hi)); + ::memcpy(&uid_lo1, &((char*)(msg.data()))[offsetBeforeData + 3 * sizeof(key_hi)], sizeof(key_hi)); + ::memcpy(&key_hi2, &((char*)(msg.data()))[offsetAfterData - 4 * sizeof(key_lo)], sizeof(key_hi)); + ::memcpy(&key_lo2, &((char*)(msg.data()))[offsetAfterData - 3 * sizeof(key_lo)], sizeof(key_hi)); + ::memcpy(&uid_hi2, &((char*)(msg.data()))[offsetAfterData - 2 * sizeof(key_lo)], sizeof(key_hi)); + ::memcpy(&uid_lo2, &((char*)(msg.data()))[offsetAfterData - 1 * sizeof(key_lo)], sizeof(key_hi)); + + ASSERT(key_hi == key_hi1 && key_hi == key_hi2); + ASSERT(key_lo == key_lo1 && key_lo == key_lo2); + ASSERT(uid_hi1 == uid_hi2); + ASSERT(uid_lo1 == uid_lo2); + + } + + if (full_check_) { + + // calculate cheksum of data excluding the key and data checksum ranges + long uidChecksumOffset = offsetBeforeData + 4 * sizeof(key_lo); + eckit::MD5 md5(&((char*)(msg.data()))[uidChecksumOffset], offsetAfterData - uidChecksumOffset); + digest = md5.digest(); + uint64_t data_hi = std::stoull(digest.substr(0, 8), nullptr, 16); + uint64_t data_lo = std::stoull(digest.substr(8, 16), nullptr, 16); + + long key_hi1 = 0, key_lo1 = 0, data_hi1 = 0, data_lo1 = 0; + ::memcpy(&key_hi1, &((char*)(msg.data()))[offsetBeforeData], sizeof(key_hi)); + ::memcpy(&key_lo1, &((char*)(msg.data()))[offsetBeforeData + sizeof(key_hi)], sizeof(key_hi)); + ::memcpy(&data_hi1, &((char*)(msg.data()))[offsetBeforeData + 2 * sizeof(key_hi)], sizeof(key_hi)); + ::memcpy(&data_lo1, &((char*)(msg.data()))[offsetBeforeData + 3 * sizeof(key_hi)], sizeof(key_hi)); + + ASSERT(key_hi == key_hi1); + ASSERT(key_lo == key_lo1); + ASSERT(data_hi == data_hi1); + ASSERT(data_lo == data_lo1); + + } + + } + } + } + } + + } + // uncomment for daos runs //fdb5::DaosManager::instance().stats().report(std::cout); // uncomment for rados runs @@ -328,11 +556,11 @@ void FDBHammer::executeRead(const eckit::option::CmdArgs &args) { // uncomment for lustre runs //fdb5::TocManager::instance().stats().report(std::cout); - Log::info() << "fdb-hammer - Fields read: " << fieldsRead << std::endl; - Log::info() << "fdb-hammer - Bytes read: " << total << std::endl; - Log::info() << "fdb-hammer - Total duration: " << timer.elapsed() << std::endl; - // Log::info() << "fdb-hammer - Total rate: " << double(total) / timer.elapsed() << " bytes / s" << std::endl; - // Log::info() << "fdb-hammer - Total rate: " << double(total) / (timer.elapsed() * 1024 * 1024) << " MB / s" << std::endl; + Log::info() << "Fields read: " << fieldsRead << std::endl; + Log::info() << "Bytes read: " << total << std::endl; + Log::info() << "Total duration: " << timer.elapsed() << std::endl; + //Log::info() << "Total rate: " << double(total) / timer.elapsed() << " bytes / s" << std::endl; + //Log::info() << "Total rate: " << double(total) / (timer.elapsed() * 1024 * 1024) << " MB / s" << std::endl; Log::info() << "Timestamp before first IO: " << (long int)tval_before_io.tv_sec << "." <<