From 9f75045f5bca35a696f79e09d44495df7b4fccfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Mon, 25 Jul 2022 11:45:30 +0200 Subject: [PATCH] Preview support for Linear read mode without snapshot attribute Currently only available for BP5 engine, will be generalized into Linear read mode in #1291. If the backend does not support the snapshot attribute, then iterate in ascending order, skipping duplicate and non-linear iteration indices. Not possible if the Series is parsed ahead of time. --- src/ReadIterations.cpp | 57 +++++++++++++-- test/SerialIOTest.cpp | 157 +++++++++++++++++++++++++++++++++++------ 2 files changed, 185 insertions(+), 29 deletions(-) diff --git a/src/ReadIterations.cpp b/src/ReadIterations.cpp index 26e6bca797..b567aa0ff1 100644 --- a/src/ReadIterations.cpp +++ b/src/ReadIterations.cpp @@ -141,6 +141,10 @@ std::optional SeriesIterator::nextIterationInStep() { using ret_t = std::optional; + if (m_iterationsInCurrentStep.empty()) + { + return ret_t{}; + } m_iterationsInCurrentStep.pop_front(); if (m_iterationsInCurrentStep.empty()) { @@ -199,16 +203,55 @@ std::optional SeriesIterator::nextStep() auto itEnd = series.iterations.end(); if (it == itEnd) { - *this = end(); - return {this}; + if (status == AdvanceStatus::RANDOMACCESS || + status == AdvanceStatus::OVER) + { + *this = end(); + return {this}; + } + else + { + /* + * Stream still going but there was no iteration found in the + * current IO step? + * Might be a duplicate iteration resulting from appending, + * will skip such iterations and hope to find something in a + * later IO step. No need to finish right now. + */ + m_iterationsInCurrentStep = {}; + m_series->advance(AdvanceMode::ENDSTEP); + } } - ++it; - if (it == itEnd) + else { - *this = end(); - return {this}; + ++it; + + if (it == itEnd) + { + if (status == AdvanceStatus::RANDOMACCESS || + status == AdvanceStatus::OVER) + { + *this = end(); + return {this}; + } + else + { + /* + * Stream still going but there was no iteration found in + * the current IO step? Might be a duplicate iteration + * resulting from appending, will skip such iterations and + * hope to find something in a later IO step. No need to + * finish right now. + */ + m_iterationsInCurrentStep = {}; + m_series->advance(AdvanceMode::ENDSTEP); + } + } + else + { + m_iterationsInCurrentStep = {it->first}; + } } - m_iterationsInCurrentStep = {it->first}; } if (status == AdvanceStatus::OVER) diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index 025e30a717..17f4bf57fa 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -6069,9 +6069,46 @@ TEST_CASE("varying_zero_pattern", "[serial]") } } +enum class ParseMode +{ + /* + * Conventional workflow. Just parse the whole thing and yield iterations + * in rising order. + */ + NoSteps, + /* + * The Series is parsed ahead of time upon opening, but it has steps. + * Parsing ahead of time is the conventional workflow to support + * random-access. + * Reading such a Series with the streaming API is only possible if all + * steps are in ascending order, otherwise the openPMD-api has no way of + * associating IO steps with interation indices. + * Reading such a Series with the Streaming API will become possible with + * the Linear read mode to be introduced by #1291. + */ + AheadOfTimeWithoutSnapshot, + /* + * A Series of the BP5 engine is not parsed ahead of time, but step-by-step, + * giving the openPMD-api a way to associate IO steps with iterations. + * No snapshot attribute exists, so the fallback mode is chosen: + * Iterations are returned in ascending order. + * If an IO step returns an iteration whose index is lower than the + * last one, it will be skipped. + * This mode of parsing will be generalized into the Linear read mode with + * PR #1291. + */ + LinearWithoutSnapshot, + /* + * Snapshot attribute exists and dictates the iteration index returned by + * an IO step. Duplicate iterations will be skipped. + */ + WithSnapshot +}; + void append_mode( std::string const &extension, bool variableBased, + ParseMode parseMode, std::string jsonConfig = "{}") { @@ -6179,16 +6216,37 @@ void append_mode( } { Series read(filename, Access::READ_ONLY); - if (variableBased || extension == "bp5") + switch (parseMode) { + case ParseMode::NoSteps: { + unsigned counter = 0; + uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 7, 10, 11}; + for (auto const &iteration : read.readIterations()) + { + REQUIRE(iteration.iterationIndex == iterationOrder[counter]); + ++counter; + } + REQUIRE(counter == 8); + } + break; + case ParseMode::LinearWithoutSnapshot: { + unsigned counter = 0; + uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 10, 11}; + for (auto const &iteration : read.readIterations()) + { + REQUIRE(iteration.iterationIndex == iterationOrder[counter]); + ++counter; + } + REQUIRE(counter == 7); + } + break; + case ParseMode::WithSnapshot: { // in variable-based encodings, iterations are not parsed ahead of // time but as they go unsigned counter = 0; uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 10, 7, 11}; for (auto const &iteration : read.readIterations()) { - std::cout << "Seeing iteration " << iteration.iterationIndex - << " of series " << filename << std::endl; REQUIRE(iteration.iterationIndex == iterationOrder[counter]); ++counter; } @@ -6196,9 +6254,22 @@ void append_mode( // Cannot do listSeries here because the Series is already drained REQUIRE_THROWS_AS(helper::listSeries(read), error::WrongAPIUsage); } - else - { + break; + case ParseMode::AheadOfTimeWithoutSnapshot: { REQUIRE(read.iterations.size() == 8); + unsigned counter = 0; + uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 7, 10, 11}; + /* + * Use conventional read API since streaming API is not possible + * without Linear read mode. + * (See also comments inside ParseMode enum). + */ + for (auto const &iteration : read.iterations) + { + REQUIRE(iteration.first == iterationOrder[counter]); + ++counter; + } + REQUIRE(counter == 8); /* * Roadmap: for now, reading this should work by ignoring the last * duplicate iteration. @@ -6208,6 +6279,8 @@ void append_mode( */ helper::listSeries(read); } + break; + } } #if 100000000 * ADIOS2_VERSION_MAJOR + 1000000 * ADIOS2_VERSION_MINOR + \ 10000 * ADIOS2_VERSION_PATCH + 100 * ADIOS2_VERSION_TWEAK >= \ @@ -6244,16 +6317,47 @@ void append_mode( } { Series read(filename, Access::READ_ONLY); - // in variable-based encodings, iterations are not parsed ahead of - // time but as they go - unsigned counter = 0; - for (auto const &iteration : read.readIterations()) + switch (parseMode) { - REQUIRE(iteration.iterationIndex == counter); - ++counter; + case ParseMode::LinearWithoutSnapshot: { + uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 10}; + unsigned counter = 0; + for (auto const &iteration : read.readIterations()) + { + REQUIRE( + iteration.iterationIndex == iterationOrder[counter]); + ++counter; + } + REQUIRE(counter == 6); + // Cannot do listSeries here because the Series is already + // drained + REQUIRE_THROWS_AS( + helper::listSeries(read), error::WrongAPIUsage); + } + break; + case ParseMode::WithSnapshot: { + // in variable-based encodings, iterations are not parsed ahead + // of time but as they go + unsigned counter = 0; + uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 10, 7, 5}; + for (auto const &iteration : read.readIterations()) + { + REQUIRE( + iteration.iterationIndex == iterationOrder[counter]); + ++counter; + } + REQUIRE(counter == 8); + // Cannot do listSeries here because the Series is already + // drained + REQUIRE_THROWS_AS( + helper::listSeries(read), error::WrongAPIUsage); + } + break; + case ParseMode::NoSteps: + case ParseMode::AheadOfTimeWithoutSnapshot: + throw std::runtime_error("Test configured wrong."); + break; } - REQUIRE(counter == 6); - helper::listSeries(read); } } #endif @@ -6263,9 +6367,7 @@ TEST_CASE("append_mode", "[serial]") { for (auto const &t : testedFileExtensions()) { - if (t == "bp" || t == "bp4" || t == "bp5") - { - std::string jsonConfigOld = R"END( + std::string jsonConfigOld = R"END( { "adios2": { @@ -6276,7 +6378,7 @@ TEST_CASE("append_mode", "[serial]") } } })END"; - std::string jsonConfigNew = R"END( + std::string jsonConfigNew = R"END( { "adios2": { @@ -6287,14 +6389,25 @@ TEST_CASE("append_mode", "[serial]") } } })END"; - append_mode(t, false, jsonConfigOld); - append_mode(t, false, jsonConfigNew); - append_mode(t, true, jsonConfigOld); - append_mode(t, true, jsonConfigNew); + if (t == "bp5") + { + append_mode( + t, false, ParseMode::LinearWithoutSnapshot, jsonConfigOld); + append_mode(t, false, ParseMode::WithSnapshot, jsonConfigNew); + append_mode(t, true, ParseMode::WithSnapshot, jsonConfigOld); + append_mode(t, true, ParseMode::WithSnapshot, jsonConfigNew); + } + else if (t == "bp" || t == "bp4" || t == "bp5") + { + append_mode( + t, false, ParseMode::AheadOfTimeWithoutSnapshot, jsonConfigOld); + append_mode(t, false, ParseMode::WithSnapshot, jsonConfigNew); + append_mode(t, true, ParseMode::WithSnapshot, jsonConfigOld); + append_mode(t, true, ParseMode::WithSnapshot, jsonConfigNew); } else { - append_mode(t, false); + append_mode(t, false, ParseMode::NoSteps); } } }