Skip to content

Commit

Permalink
Preview support for Linear read mode without snapshot attribute
Browse files Browse the repository at this point in the history
Currently only available for BP5 engine, will be generalized into Linear
read mode in #1291.
If the backend does not support the snapshot attribute, then iterate in
ascending order, skipping duplicate and non-linear iteration indices.
Not possible if the Series is parsed ahead of time.
  • Loading branch information
franzpoeschel committed Jul 25, 2022
1 parent 6fa52c8 commit 9f75045
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 29 deletions.
57 changes: 50 additions & 7 deletions src/ReadIterations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ std::optional<SeriesIterator *> SeriesIterator::nextIterationInStep()
{
using ret_t = std::optional<SeriesIterator *>;

if (m_iterationsInCurrentStep.empty())
{
return ret_t{};
}
m_iterationsInCurrentStep.pop_front();
if (m_iterationsInCurrentStep.empty())
{
Expand Down Expand Up @@ -199,16 +203,55 @@ std::optional<SeriesIterator *> SeriesIterator::nextStep()
auto itEnd = series.iterations.end();
if (it == itEnd)
{
*this = end();
return {this};
if (status == AdvanceStatus::RANDOMACCESS ||
status == AdvanceStatus::OVER)
{
*this = end();
return {this};
}
else
{
/*
* Stream still going but there was no iteration found in the
* current IO step?
* Might be a duplicate iteration resulting from appending,
* will skip such iterations and hope to find something in a
* later IO step. No need to finish right now.
*/
m_iterationsInCurrentStep = {};
m_series->advance(AdvanceMode::ENDSTEP);
}
}
++it;
if (it == itEnd)
else
{
*this = end();
return {this};
++it;

if (it == itEnd)
{
if (status == AdvanceStatus::RANDOMACCESS ||
status == AdvanceStatus::OVER)
{
*this = end();
return {this};
}
else
{
/*
* Stream still going but there was no iteration found in
* the current IO step? Might be a duplicate iteration
* resulting from appending, will skip such iterations and
* hope to find something in a later IO step. No need to
* finish right now.
*/
m_iterationsInCurrentStep = {};
m_series->advance(AdvanceMode::ENDSTEP);
}
}
else
{
m_iterationsInCurrentStep = {it->first};
}
}
m_iterationsInCurrentStep = {it->first};
}

if (status == AdvanceStatus::OVER)
Expand Down
157 changes: 135 additions & 22 deletions test/SerialIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6069,9 +6069,46 @@ TEST_CASE("varying_zero_pattern", "[serial]")
}
}

enum class ParseMode
{
/*
* Conventional workflow. Just parse the whole thing and yield iterations
* in rising order.
*/
NoSteps,
/*
* The Series is parsed ahead of time upon opening, but it has steps.
* Parsing ahead of time is the conventional workflow to support
* random-access.
* Reading such a Series with the streaming API is only possible if all
* steps are in ascending order, otherwise the openPMD-api has no way of
* associating IO steps with interation indices.
* Reading such a Series with the Streaming API will become possible with
* the Linear read mode to be introduced by #1291.
*/
AheadOfTimeWithoutSnapshot,
/*
* A Series of the BP5 engine is not parsed ahead of time, but step-by-step,
* giving the openPMD-api a way to associate IO steps with iterations.
* No snapshot attribute exists, so the fallback mode is chosen:
* Iterations are returned in ascending order.
* If an IO step returns an iteration whose index is lower than the
* last one, it will be skipped.
* This mode of parsing will be generalized into the Linear read mode with
* PR #1291.
*/
LinearWithoutSnapshot,
/*
* Snapshot attribute exists and dictates the iteration index returned by
* an IO step. Duplicate iterations will be skipped.
*/
WithSnapshot
};

void append_mode(
std::string const &extension,
bool variableBased,
ParseMode parseMode,
std::string jsonConfig = "{}")
{

Expand Down Expand Up @@ -6179,26 +6216,60 @@ void append_mode(
}
{
Series read(filename, Access::READ_ONLY);
if (variableBased || extension == "bp5")
switch (parseMode)
{
case ParseMode::NoSteps: {
unsigned counter = 0;
uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 7, 10, 11};
for (auto const &iteration : read.readIterations())
{
REQUIRE(iteration.iterationIndex == iterationOrder[counter]);
++counter;
}
REQUIRE(counter == 8);
}
break;
case ParseMode::LinearWithoutSnapshot: {
unsigned counter = 0;
uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 10, 11};
for (auto const &iteration : read.readIterations())
{
REQUIRE(iteration.iterationIndex == iterationOrder[counter]);
++counter;
}
REQUIRE(counter == 7);
}
break;
case ParseMode::WithSnapshot: {
// in variable-based encodings, iterations are not parsed ahead of
// time but as they go
unsigned counter = 0;
uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 10, 7, 11};
for (auto const &iteration : read.readIterations())
{
std::cout << "Seeing iteration " << iteration.iterationIndex
<< " of series " << filename << std::endl;
REQUIRE(iteration.iterationIndex == iterationOrder[counter]);
++counter;
}
REQUIRE(counter == 8);
// Cannot do listSeries here because the Series is already drained
REQUIRE_THROWS_AS(helper::listSeries(read), error::WrongAPIUsage);
}
else
{
break;
case ParseMode::AheadOfTimeWithoutSnapshot: {
REQUIRE(read.iterations.size() == 8);
unsigned counter = 0;
uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 7, 10, 11};
/*
* Use conventional read API since streaming API is not possible
* without Linear read mode.
* (See also comments inside ParseMode enum).
*/
for (auto const &iteration : read.iterations)
{
REQUIRE(iteration.first == iterationOrder[counter]);
++counter;
}
REQUIRE(counter == 8);
/*
* Roadmap: for now, reading this should work by ignoring the last
* duplicate iteration.
Expand All @@ -6208,6 +6279,8 @@ void append_mode(
*/
helper::listSeries(read);
}
break;
}
}
#if 100000000 * ADIOS2_VERSION_MAJOR + 1000000 * ADIOS2_VERSION_MINOR + \
10000 * ADIOS2_VERSION_PATCH + 100 * ADIOS2_VERSION_TWEAK >= \
Expand Down Expand Up @@ -6244,16 +6317,47 @@ void append_mode(
}
{
Series read(filename, Access::READ_ONLY);
// in variable-based encodings, iterations are not parsed ahead of
// time but as they go
unsigned counter = 0;
for (auto const &iteration : read.readIterations())
switch (parseMode)
{
REQUIRE(iteration.iterationIndex == counter);
++counter;
case ParseMode::LinearWithoutSnapshot: {
uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 10};
unsigned counter = 0;
for (auto const &iteration : read.readIterations())
{
REQUIRE(
iteration.iterationIndex == iterationOrder[counter]);
++counter;
}
REQUIRE(counter == 6);
// Cannot do listSeries here because the Series is already
// drained
REQUIRE_THROWS_AS(
helper::listSeries(read), error::WrongAPIUsage);
}
break;
case ParseMode::WithSnapshot: {
// in variable-based encodings, iterations are not parsed ahead
// of time but as they go
unsigned counter = 0;
uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 10, 7, 5};
for (auto const &iteration : read.readIterations())
{
REQUIRE(
iteration.iterationIndex == iterationOrder[counter]);
++counter;
}
REQUIRE(counter == 8);
// Cannot do listSeries here because the Series is already
// drained
REQUIRE_THROWS_AS(
helper::listSeries(read), error::WrongAPIUsage);
}
break;
case ParseMode::NoSteps:
case ParseMode::AheadOfTimeWithoutSnapshot:
throw std::runtime_error("Test configured wrong.");
break;
}
REQUIRE(counter == 6);
helper::listSeries(read);
}
}
#endif
Expand All @@ -6263,9 +6367,7 @@ TEST_CASE("append_mode", "[serial]")
{
for (auto const &t : testedFileExtensions())
{
if (t == "bp" || t == "bp4" || t == "bp5")
{
std::string jsonConfigOld = R"END(
std::string jsonConfigOld = R"END(
{
"adios2":
{
Expand All @@ -6276,7 +6378,7 @@ TEST_CASE("append_mode", "[serial]")
}
}
})END";
std::string jsonConfigNew = R"END(
std::string jsonConfigNew = R"END(
{
"adios2":
{
Expand All @@ -6287,14 +6389,25 @@ TEST_CASE("append_mode", "[serial]")
}
}
})END";
append_mode(t, false, jsonConfigOld);
append_mode(t, false, jsonConfigNew);
append_mode(t, true, jsonConfigOld);
append_mode(t, true, jsonConfigNew);
if (t == "bp5")
{
append_mode(
t, false, ParseMode::LinearWithoutSnapshot, jsonConfigOld);
append_mode(t, false, ParseMode::WithSnapshot, jsonConfigNew);
append_mode(t, true, ParseMode::WithSnapshot, jsonConfigOld);
append_mode(t, true, ParseMode::WithSnapshot, jsonConfigNew);
}
else if (t == "bp" || t == "bp4" || t == "bp5")
{
append_mode(
t, false, ParseMode::AheadOfTimeWithoutSnapshot, jsonConfigOld);
append_mode(t, false, ParseMode::WithSnapshot, jsonConfigNew);
append_mode(t, true, ParseMode::WithSnapshot, jsonConfigOld);
append_mode(t, true, ParseMode::WithSnapshot, jsonConfigNew);
}
else
{
append_mode(t, false);
append_mode(t, false, ParseMode::NoSteps);
}
}
}
Expand Down

0 comments on commit 9f75045

Please sign in to comment.