From a83bc225b5c347d07e11b38cca75ff8970ca3f30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Tue, 25 Oct 2022 19:47:29 +0200 Subject: [PATCH] Mapping between ADIOS steps and openPMD iterations (#949) * Backend additions 1) New streaming status: RANDOM_ACCESS, for non-streaming situations 2) Variable attributes, to be written only if the backend has support for steps * Writing changes: Write current step(s) to snapshot attribute Only set snapshot attribute if Iteration is not yet written For v-based iteration encoding, the snapshot attribute is already being set before this PR. Just add a comment there. Also add missing includes Co-authored-by: Axel Huebl * Reading changes: Use snapshot attribute This means that the snapshot attribute, if present, is used for accessing iterations inside `series.readIterations()`. Fallback to the old behavior (linear progression through iterations) if the attribute is not found. Variable-b. encoding: Allow several (equivalent) iterations per step This means that a single step can be marked by /data/snapshot to represent iterations 0,10,20,30 at the same time. The underlying data is the same, but the API will treat it as 4 times a different iteration with equivalent content. Avoid const_cast by introducing a parsing state and use that when re-parsing. Skip repeated iterations that occur in Append mode Before the explicit iteration-step mapping, these were not seen by reading procedures at all. Now they are, so we skip the second instance. Better error message when calling readIterations() too late This commit includes some refactoring 1. Remove recursion of operator++(), this leads to constant memory usage rather than filling the stack at some point 2. Extract subroutines from operator++() 3. Steal some refactoring that solved some bugs on topic-read-leniently, so it stands to reason that we should apply it here already * Testing In the tests, don't try to read the series with listSeries after already having fully drained it Combined test: append mode and weird iteration order Deactivate troublesome Schema 2021 Append test * Add -wd1011 flag to Icc workflow * Fix priority of JSON/envvar config for ADIOS2 schema * Preview support for Linear read mode without snapshot attribute Currently only available for BP5 engine, will be generalized into Linear read mode in #1291. If the backend does not support the snapshot attribute, then iterate in ascending order, skipping duplicate and non-linear iteration indices. Not possible if the Series is parsed ahead of time. * Test edge cases of snapshot attribute Co-authored-by: Axel Huebl --- .github/workflows/intel.yml | 7 +- include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp | 15 +- include/openPMD/IO/AbstractIOHandler.hpp | 23 ++ include/openPMD/IO/AbstractIOHandlerImpl.hpp | 15 +- include/openPMD/IO/IOTask.hpp | 8 + include/openPMD/Iteration.hpp | 50 ++- include/openPMD/ReadIterations.hpp | 38 ++- include/openPMD/Series.hpp | 29 +- include/openPMD/Streaming.hpp | 5 +- include/openPMD/backend/Attributable.hpp | 4 +- include/openPMD/backend/Container.hpp | 8 +- src/IO/ADIOS/ADIOS2IOHandler.cpp | 21 +- src/IO/ADIOS/CommonADIOS1IOHandler.cpp | 5 + src/IO/HDF5/HDF5IOHandler.cpp | 5 + src/IO/JSON/JSONIOHandlerImpl.cpp | 5 + src/Iteration.cpp | 97 ++++-- src/ReadIterations.cpp | 309 +++++++++++++++---- src/Series.cpp | 269 +++++++++++++--- test/JSONTest.cpp | 87 ++++++ test/ParallelIOTest.cpp | 29 +- test/SerialIOTest.cpp | 259 +++++++++++++--- test/python/unittest/API/APITest.py | 5 +- 22 files changed, 1093 insertions(+), 200 deletions(-) diff --git a/.github/workflows/intel.yml b/.github/workflows/intel.yml index 2b3cea6233..39a3d51e28 100644 --- a/.github/workflows/intel.yml +++ b/.github/workflows/intel.yml @@ -17,7 +17,12 @@ jobs: run: | sudo .github/workflows/dependencies/install_icc - name: Build - env: {CXXFLAGS: -Werror} + # Due to compiler bugs in Intel compiler, we need to disable warning 1011 + # (missing return value), otherwise `if constexpr` functions + # don't compile. + # See https://community.intel.com/t5/Intel-C-Compiler/quot-if-constexpr-quot-and-quot-missing-return-statement-quot-in/td-p/1154551 + # Using a local pragma does not work due to the reasons stated there. + env: {CXXFLAGS: -Werror -wd1011} run: | set +e; source /opt/intel/oneapi/setvars.sh; set -e share/openPMD/download_samples.sh build diff --git a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp index d3ccc93258..bc8ea80ad5 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp @@ -1155,13 +1155,6 @@ namespace detail */ void invalidateVariablesMap(); - private: - ADIOS2IOHandlerImpl *m_impl; - std::optional m_engine; //! ADIOS engine - /** - * The ADIOS2 engine type, to be passed to adios2::IO::SetEngine - */ - std::string m_engineType; /* * streamStatus is NoStream for file-based ADIOS engines. * This is relevant for the method BufferedActions::requireActiveStep, @@ -1253,6 +1246,14 @@ namespace detail }; StreamStatus streamStatus = StreamStatus::OutsideOfStep; + private: + ADIOS2IOHandlerImpl *m_impl; + std::optional m_engine; //! ADIOS engine + /** + * The ADIOS2 engine type, to be passed to adios2::IO::SetEngine + */ + std::string m_engineType; + /** * See documentation for StreamStatus::Parsing. * Will be set true under the circumstance described there in order to diff --git a/include/openPMD/IO/AbstractIOHandler.hpp b/include/openPMD/IO/AbstractIOHandler.hpp index 4f6916ae55..7627b66524 100644 --- a/include/openPMD/IO/AbstractIOHandler.hpp +++ b/include/openPMD/IO/AbstractIOHandler.hpp @@ -121,6 +121,28 @@ namespace internal FlushParams const defaultFlushParams{}; struct ParsedFlushParams; + + /** + * Some parts of the openPMD object model are read-only when accessing + * a Series in Access::READ_ONLY mode, notably Containers and Attributes. + * They are filled at parse time and not modified afterwards. + * Such state-changing operations are hence allowed under either of two + * conditions: + * 1) The Series is opened in an open mode that allows writing in any way. + * (Currently any but Access::READ_ONLY). + * 2) The Series is in Parsing state. This way, modifying the open mode + * during parsing can be avoided. + */ + enum class SeriesStatus : unsigned char + { + Default, ///< Mutability of objects in the openPMD object model is + ///< determined by the open mode (Access enum), normal state in + ///< which the user interacts with the Series. + Parsing ///< All objects in the openPMD object model are temporarily + ///< mutable to allow inserting newly-parsed data. + ///< Special state only active while internal routines are + ///< running. + }; } // namespace internal /** Interface for communicating between logical and physically persistent data. @@ -192,6 +214,7 @@ class AbstractIOHandler // why do these need to be separate? Access const m_backendAccess; Access const m_frontendAccess; + internal::SeriesStatus m_seriesStatus = internal::SeriesStatus::Default; std::queue m_work; }; // AbstractIOHandler diff --git a/include/openPMD/IO/AbstractIOHandlerImpl.hpp b/include/openPMD/IO/AbstractIOHandlerImpl.hpp index 55ff021d06..04820a28d4 100644 --- a/include/openPMD/IO/AbstractIOHandlerImpl.hpp +++ b/include/openPMD/IO/AbstractIOHandlerImpl.hpp @@ -252,8 +252,10 @@ class AbstractIOHandlerImpl * The advance mode is determined by parameters.mode. * The return status code shall be stored as parameters.status. */ - virtual void advance(Writable *, Parameter &) - {} + virtual void advance(Writable *, Parameter ¶meters) + { + *parameters.status = AdvanceStatus::RANDOMACCESS; + } /** Close an openPMD group. * @@ -488,8 +490,13 @@ class AbstractIOHandlerImpl * datatype parameters.dtype. Any existing attribute with the same name * should be overwritten. If possible, only the value should be changed if * the datatype stays the same. The attribute should be written to physical - * storage after the operation completes successfully. All datatypes of - * Datatype should be supported in a type-safe way. + * storage after the operation completes successfully. If the parameter + * changesOverSteps is true, then the attribute must be able to hold + * different values across IO steps. If the backend does not support IO + * steps in such a way, the attribute should not be written. (IO steps are + * an optional backend feature and the frontend must implement fallback + * measures in such a case) All datatypes of Datatype should be supported in + * a type-safe way. */ virtual void writeAttribute(Writable *, Parameter const &) = 0; diff --git a/include/openPMD/IO/IOTask.hpp b/include/openPMD/IO/IOTask.hpp index 3ba7d09e24..88c7d0380b 100644 --- a/include/openPMD/IO/IOTask.hpp +++ b/include/openPMD/IO/IOTask.hpp @@ -537,6 +537,7 @@ struct OPENPMDAPI_EXPORT Parameter : AbstractParameter() , name(p.name) , dtype(p.dtype) + , changesOverSteps(p.changesOverSteps) , resource(p.resource) {} @@ -548,6 +549,13 @@ struct OPENPMDAPI_EXPORT Parameter std::string name = ""; Datatype dtype = Datatype::UNDEFINED; + /* + * If true, this attribute changes across IO steps. + * It should only be written in backends that support IO steps, + * otherwise writing should be skipped. + * The frontend is responsible for handling both situations. + */ + bool changesOverSteps = false; Attribute::resource resource; }; diff --git a/include/openPMD/Iteration.hpp b/include/openPMD/Iteration.hpp index ae58e787e1..2d14313cfa 100644 --- a/include/openPMD/Iteration.hpp +++ b/include/openPMD/Iteration.hpp @@ -28,7 +28,10 @@ #include "openPMD/backend/Attributable.hpp" #include "openPMD/backend/Container.hpp" +#include +#include #include +#include namespace openPMD { @@ -282,14 +285,57 @@ class Iteration : public Attributable void readGorVBased(std::string const &groupPath, bool beginStep); void read_impl(std::string const &groupPath); + /** + * Status after beginning an IO step. Currently includes: + * * The advance status (OK, OVER, RANDOMACCESS) + * * The opened iterations, in case the snapshot attribute is found + */ + struct BeginStepStatus + { + using AvailableIterations_t = std::optional >; + + AdvanceStatus stepStatus{}; + /* + * If the iteration attribute `snapshot` is present, the value of that + * attribute. Otherwise empty. + */ + AvailableIterations_t iterationsInOpenedStep; + + /* + * Most of the time, the AdvanceStatus part of this struct is what we + * need, so let's make it easy to access. + */ + inline operator AdvanceStatus() const + { + return stepStatus; + } + + /* + * Support for std::tie() + */ + inline operator std::tuple() + { + return std::tuple{ + stepStatus, iterationsInOpenedStep}; + } + }; + /** * @brief Begin an IO step on the IO file (or file-like object) * containing this iteration. In case of group-based iteration * layout, this will be the complete Series. * - * @return AdvanceStatus + * @return BeginStepStatus + */ + BeginStepStatus beginStep(bool reread); + + /* + * Iteration-independent variant for beginStep(). + * Useful in group-based iteration encoding where the Iteration will only + * be known after opening the step. */ - AdvanceStatus beginStep(bool reread); + static BeginStepStatus + beginStep(std::optional thisObject, Series &series, bool reread); /** * @brief End an IO step on the IO file (or file-like object) diff --git a/include/openPMD/ReadIterations.hpp b/include/openPMD/ReadIterations.hpp index 473a4fae36..c5b2720dce 100644 --- a/include/openPMD/ReadIterations.hpp +++ b/include/openPMD/ReadIterations.hpp @@ -23,6 +23,8 @@ #include "openPMD/Iteration.hpp" #include "openPMD/Series.hpp" +#include +#include #include namespace openPMD @@ -54,7 +56,8 @@ class SeriesIterator using maybe_series_t = std::optional; maybe_series_t m_series; - iteration_index_t m_currentIteration = 0; + std::deque m_iterationsInCurrentStep; + uint64_t m_currentIteration{}; public: //! construct the end() iterator @@ -71,6 +74,39 @@ class SeriesIterator bool operator!=(SeriesIterator const &other) const; static SeriesIterator end(); + +private: + inline bool setCurrentIteration() + { + if (m_iterationsInCurrentStep.empty()) + { + std::cerr << "[ReadIterations] Encountered a step without " + "iterations. Closing the Series." + << std::endl; + *this = end(); + return false; + } + m_currentIteration = *m_iterationsInCurrentStep.begin(); + return true; + } + + inline std::optional peekCurrentIteration() + { + if (m_iterationsInCurrentStep.empty()) + { + return std::nullopt; + } + else + { + return {*m_iterationsInCurrentStep.begin()}; + } + } + + std::optional nextIterationInStep(); + + std::optional nextStep(); + + std::optional loopBody(); }; /** diff --git a/include/openPMD/Series.hpp b/include/openPMD/Series.hpp index cda0956b52..362001605e 100644 --- a/include/openPMD/Series.hpp +++ b/include/openPMD/Series.hpp @@ -37,8 +37,11 @@ #include #endif +#include +#include #include #include +#include #include // expose private and protected members for invasive testing @@ -82,6 +85,12 @@ namespace internal * the same instance. */ std::optional m_writeIterations; + /** + * For writing: Remember which iterations have been written in the + * currently active output step. Use this later when writing the + * snapshot attribute. + */ + std::set m_currentlyActiveIterations; /** * Needed if reading a single iteration of a file-based series. * Users may specify the concrete filename of one iteration instead of @@ -576,8 +585,10 @@ OPENPMD_private * Note on re-parsing of a Series: * If init == false, the parsing process will seek for new * Iterations/Records/Record Components etc. + * If series.iterations contains the attribute `snapshot`, returns its + * value. */ - void readGorVBased(bool init = true); + std::optional > readGorVBased(bool init = true); void readBase(); std::string iterationFilename(uint64_t i); @@ -627,6 +638,22 @@ OPENPMD_private internal::AttributableData &file, iterations_iterator it, Iteration &iteration); + + AdvanceStatus advance(AdvanceMode mode); + + /** + * @brief Called at the end of an IO step to store the iterations defined + * in the IO step to the snapshot attribute. + * + * @param doFlush If true, flush the IO handler. + */ + void flushStep(bool doFlush); + + /* + * Returns the current content of the /data/snapshot attribute. + * (We could also add this to the public API some time) + */ + std::optional > currentSnapshot() const; }; // Series } // namespace openPMD diff --git a/include/openPMD/Streaming.hpp b/include/openPMD/Streaming.hpp index 7bc84341aa..94854662fa 100644 --- a/include/openPMD/Streaming.hpp +++ b/include/openPMD/Streaming.hpp @@ -19,8 +19,9 @@ namespace openPMD */ enum class AdvanceStatus : unsigned char { - OK, /* stream goes on */ - OVER /* stream is over */ + OK, ///< stream goes on + OVER, ///< stream is over + RANDOMACCESS ///< there is no stream, it will never be over }; /** diff --git a/include/openPMD/backend/Attributable.hpp b/include/openPMD/backend/Attributable.hpp index 1c9dda8429..8d34ee7935 100644 --- a/include/openPMD/backend/Attributable.hpp +++ b/include/openPMD/backend/Attributable.hpp @@ -463,7 +463,9 @@ inline bool Attributable::setAttributeImpl( internal::attr_value_check(key, value, setAttributeMode); auto &attri = get(); - if (IOHandler() && Access::READ_ONLY == IOHandler()->m_frontendAccess) + if (IOHandler() && + IOHandler()->m_seriesStatus == internal::SeriesStatus::Default && + Access::READ_ONLY == IOHandler()->m_frontendAccess) { auxiliary::OutOfRangeMsg const out_of_range_msg( "Attribute", "can not be set (read-only)."); diff --git a/include/openPMD/backend/Container.hpp b/include/openPMD/backend/Container.hpp index 9dd163ecae..8db82c69f0 100644 --- a/include/openPMD/backend/Container.hpp +++ b/include/openPMD/backend/Container.hpp @@ -288,7 +288,9 @@ class Container : public Attributable return it->second; else { - if (Access::READ_ONLY == IOHandler()->m_frontendAccess) + if (IOHandler()->m_seriesStatus != + internal::SeriesStatus::Parsing && + Access::READ_ONLY == IOHandler()->m_frontendAccess) { auxiliary::OutOfRangeMsg const out_of_range_msg; throw std::out_of_range(out_of_range_msg(key)); @@ -321,7 +323,9 @@ class Container : public Attributable return it->second; else { - if (Access::READ_ONLY == IOHandler()->m_frontendAccess) + if (IOHandler()->m_seriesStatus != + internal::SeriesStatus::Parsing && + Access::READ_ONLY == IOHandler()->m_frontendAccess) { auxiliary::OutOfRangeMsg out_of_range_msg; throw std::out_of_range(out_of_range_msg(key)); diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index fcdfffc1f8..95362b2687 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -134,6 +134,10 @@ void ADIOS2IOHandlerImpl::init(json::TracingJSON cfg) m_engineType.end(), m_engineType.begin(), [](unsigned char c) { return std::tolower(c); }); + + // environment-variable based configuration + m_schema = auxiliary::getEnvNum("OPENPMD2_ADIOS2_SCHEMA", m_schema); + if (cfg.json().contains("adios2")) { m_config = cfg["adios2"]; @@ -179,8 +183,6 @@ void ADIOS2IOHandlerImpl::init(json::TracingJSON cfg) defaultOperators = std::move(operators.value()); } } - // environment-variable based configuration - m_schema = auxiliary::getEnvNum("OPENPMD2_ADIOS2_SCHEMA", m_schema); } std::optional> @@ -815,6 +817,11 @@ void ADIOS2IOHandlerImpl::writeAttribute( switch (attributeLayout()) { case AttributeLayout::ByAdiosAttributes: + if (parameters.changesOverSteps) + { + // cannot do this + return; + } switchType( parameters.dtype, this, writable, parameters); break; @@ -829,6 +836,13 @@ void ADIOS2IOHandlerImpl::writeAttribute( auto prefix = filePositionToString(pos); auto &filedata = getFileData(file, IfFileNotOpen::ThrowError); + if (parameters.changesOverSteps && + filedata.streamStatus == + detail::BufferedActions::StreamStatus::NoStream) + { + // cannot do this + return; + } filedata.requireActiveStep(); filedata.invalidateAttributesMap(); m_dirty.emplace(std::move(file)); @@ -2802,6 +2816,7 @@ namespace detail "[ADIOS2] Operation requires active step but no step is " "left."); case AdvanceStatus::OK: + case AdvanceStatus::RANDOMACCESS: // pass break; } @@ -3005,7 +3020,7 @@ namespace detail m_IO.DefineAttribute( ADIOS2Defaults::str_usesstepsAttribute, 0); flush({FlushLevel::UserFlush}, /* writeAttributes = */ false); - return AdvanceStatus::OK; + return AdvanceStatus::RANDOMACCESS; } /* diff --git a/src/IO/ADIOS/CommonADIOS1IOHandler.cpp b/src/IO/ADIOS/CommonADIOS1IOHandler.cpp index 3dbee3e2db..3a13dc7fc8 100644 --- a/src/IO/ADIOS/CommonADIOS1IOHandler.cpp +++ b/src/IO/ADIOS/CommonADIOS1IOHandler.cpp @@ -1129,6 +1129,11 @@ template void CommonADIOS1IOHandlerImpl::writeAttribute( Writable *writable, Parameter const ¶meters) { + if (parameters.changesOverSteps) + { + // cannot do this + return; + } if (m_handler->m_backendAccess == Access::READ_ONLY) throw std::runtime_error( "[ADIOS1] Writing an attribute in a file opened as read only is " diff --git a/src/IO/HDF5/HDF5IOHandler.cpp b/src/IO/HDF5/HDF5IOHandler.cpp index 94dce55c1c..01979d8071 100644 --- a/src/IO/HDF5/HDF5IOHandler.cpp +++ b/src/IO/HDF5/HDF5IOHandler.cpp @@ -1328,6 +1328,11 @@ void HDF5IOHandlerImpl::writeDataset( void HDF5IOHandlerImpl::writeAttribute( Writable *writable, Parameter const ¶meters) { + if (parameters.changesOverSteps) + { + // cannot do this + return; + } if (m_handler->m_backendAccess == Access::READ_ONLY) throw std::runtime_error( "[HDF5] Writing an attribute in a file opened as read only is not " diff --git a/src/IO/JSON/JSONIOHandlerImpl.cpp b/src/IO/JSON/JSONIOHandlerImpl.cpp index 062c736433..272478789b 100644 --- a/src/IO/JSON/JSONIOHandlerImpl.cpp +++ b/src/IO/JSON/JSONIOHandlerImpl.cpp @@ -797,6 +797,11 @@ void JSONIOHandlerImpl::writeDataset( void JSONIOHandlerImpl::writeAttribute( Writable *writable, Parameter const ¶meter) { + if (parameter.changesOverSteps) + { + // cannot do this + return; + } if (m_handler->m_backendAccess == Access::READ_ONLY) { throw std::runtime_error( diff --git a/src/Iteration.cpp b/src/Iteration.cpp index 575610ea16..5ef4ac0274 100644 --- a/src/Iteration.cpp +++ b/src/Iteration.cpp @@ -282,6 +282,13 @@ void Iteration::flushVariableBased( Parameter pOpen; pOpen.path = ""; IOHandler()->enqueue(IOTask(this, pOpen)); + /* + * In v-based encoding, the snapshot attribute must always be written, + * so don't set the `changesOverSteps` flag of the IOTask here. + * Reason: Even in backends that don't support changing attributes, + * variable-based iteration encoding can be used to write one single + * iteration. Then, this attribute determines which iteration it is. + */ this->setAttribute("snapshot", i); } @@ -566,49 +573,92 @@ void Iteration::read_impl(std::string const &groupPath) readAttributes(ReadMode::FullyReread); } -AdvanceStatus Iteration::beginStep(bool reread) +auto Iteration::beginStep(bool reread) -> BeginStepStatus { - using IE = IterationEncoding; + BeginStepStatus res; auto series = retrieveSeries(); + return beginStep({*this}, series, reread); +} + +auto Iteration::beginStep( + std::optional thisObject, Series &series, bool reread) + -> BeginStepStatus +{ + BeginStepStatus res; + using IE = IterationEncoding; // Initialize file with this to quiet warnings // The following switch is comprehensive internal::AttributableData *file = nullptr; switch (series.iterationEncoding()) { case IE::fileBased: - file = &Attributable::get(); + if (thisObject.has_value()) + { + file = &static_cast(*thisObject).get(); + } + else + { + throw error::Internal( + "Advancing a step in file-based iteration encoding is " + "iteration-specific."); + } break; case IE::groupBased: case IE::variableBased: file = &series.get(); break; } - AdvanceStatus status = series.advance( - AdvanceMode::BEGINSTEP, *file, series.indexOf(*this), *this); - if (status != AdvanceStatus::OK) + + AdvanceStatus status; + if (thisObject.has_value()) + { + status = series.advance( + AdvanceMode::BEGINSTEP, + *file, + series.indexOf(*thisObject), + *thisObject); + } + else + { + status = series.advance(AdvanceMode::BEGINSTEP); + } + + switch (status) { - return status; + case AdvanceStatus::OVER: + res.stepStatus = status; + return res; + case AdvanceStatus::OK: + case AdvanceStatus::RANDOMACCESS: + break; } // re-read -> new datasets might be available - if (reread && + auto IOHandl = series.IOHandler(); + if (reread && status != AdvanceStatus::RANDOMACCESS && (series.iterationEncoding() == IE::groupBased || series.iterationEncoding() == IE::variableBased) && - (this->IOHandler()->m_frontendAccess == Access::READ_ONLY || - this->IOHandler()->m_frontendAccess == Access::READ_WRITE)) + (IOHandl->m_frontendAccess == Access::READ_ONLY || + IOHandl->m_frontendAccess == Access::READ_WRITE)) { - switch (IOHandler()->m_frontendAccess) + switch (IOHandl->m_frontendAccess) { case Access::READ_ONLY: case Access::READ_WRITE: { bool previous = series.iterations.written(); series.iterations.written() = false; - auto oldType = this->IOHandler()->m_frontendAccess; - auto newType = - const_cast(&this->IOHandler()->m_frontendAccess); - *newType = Access::READ_WRITE; - series.readGorVBased(false); - *newType = oldType; + auto oldStatus = IOHandl->m_seriesStatus; + IOHandl->m_seriesStatus = internal::SeriesStatus::Parsing; + try + { + res.iterationsInOpenedStep = series.readGorVBased(false); + } + catch (...) + { + IOHandl->m_seriesStatus = oldStatus; + throw; + } + IOHandl->m_seriesStatus = oldStatus; series.iterations.written() = previous; break; } @@ -619,7 +669,8 @@ AdvanceStatus Iteration::beginStep(bool reread) } } - return status; + res.stepStatus = status; + return res; } void Iteration::endStep() @@ -641,6 +692,7 @@ void Iteration::endStep() } // @todo filebased check series.advance(AdvanceMode::ENDSTEP, *file, series.indexOf(*this), *this); + series.get().m_currentlyActiveIterations.clear(); } StepStatus Iteration::getStepStatus() @@ -724,9 +776,8 @@ void Iteration::runDeferredParseAccess() } auto const &deferred = it.m_deferredParseAccess.value(); - auto oldAccess = IOHandler()->m_frontendAccess; - auto newAccess = const_cast(&IOHandler()->m_frontendAccess); - *newAccess = Access::READ_WRITE; + auto oldStatus = IOHandler()->m_seriesStatus; + IOHandler()->m_seriesStatus = internal::SeriesStatus::Parsing; try { if (deferred.fileBased) @@ -743,12 +794,12 @@ void Iteration::runDeferredParseAccess() { // reset this thing it.m_deferredParseAccess = std::optional(); - *newAccess = oldAccess; + IOHandler()->m_seriesStatus = oldStatus; throw; } // reset this thing it.m_deferredParseAccess = std::optional(); - *newAccess = oldAccess; + IOHandler()->m_seriesStatus = oldStatus; break; } case Access::CREATE: diff --git a/src/ReadIterations.cpp b/src/ReadIterations.cpp index b677d97535..b567aa0ff1 100644 --- a/src/ReadIterations.cpp +++ b/src/ReadIterations.cpp @@ -37,19 +37,26 @@ SeriesIterator::SeriesIterator(Series series) : m_series(std::move(series)) *this = end(); return; } + else if ( + it->second.get().m_closed == internal::CloseStatus::ClosedInBackend) + { + throw error::WrongAPIUsage( + "Trying to call Series::readIterations() on a (partially) read " + "Series."); + } else { - auto openIteration = [&it]() { + auto openIteration = [](Iteration &iteration) { /* * @todo * Is that really clean? * Use case: See Python ApiTest testListSeries: * Call listSeries twice. */ - if (it->second.get().m_closed != + if (iteration.get().m_closed != internal::CloseStatus::ClosedInBackend) { - it->second.open(); + iteration.open(); } }; AdvanceStatus status{}; @@ -62,101 +69,289 @@ SeriesIterator::SeriesIterator(Series series) : m_series(std::move(series)) * so do that now. There is only one step per file, so beginning * the step after parsing the file is ok. */ - openIteration(); + + openIteration(series.iterations.begin()->second); status = it->second.beginStep(/* reread = */ true); + for (auto const &pair : m_series.value().iterations) + { + m_iterationsInCurrentStep.push_back(pair.first); + } break; case IterationEncoding::groupBased: - case IterationEncoding::variableBased: + case IterationEncoding::variableBased: { /* * In group-based iteration layout, we have definitely already had * access to the file until now. Better to begin a step right away, * otherwise we might get another step's data. */ - status = it->second.beginStep(/* reread = */ true); - openIteration(); + Iteration::BeginStepStatus::AvailableIterations_t + availableIterations; + std::tie(status, availableIterations) = + it->second.beginStep(/* reread = */ true); + /* + * In random-access mode, do not use the information read in the + * `snapshot` attribute, instead simply go through iterations + * one by one in ascending order (fallback implementation in the + * second if branch). + */ + if (availableIterations.has_value() && + status != AdvanceStatus::RANDOMACCESS) + { + m_iterationsInCurrentStep = availableIterations.value(); + if (!m_iterationsInCurrentStep.empty()) + { + openIteration( + series.iterations.at(m_iterationsInCurrentStep.at(0))); + } + } + else if (!series.iterations.empty()) + { + /* + * Fallback implementation: Assume that each step corresponds + * with an iteration in ascending order. + */ + m_iterationsInCurrentStep = {series.iterations.begin()->first}; + openIteration(series.iterations.begin()->second); + } + else + { + // this is a no-op, but let's keep it explicit + m_iterationsInCurrentStep = {}; + } + break; } + } + if (status == AdvanceStatus::OVER) { *this = end(); return; } + if (!setCurrentIteration()) + { + *this = end(); + return; + } it->second.setStepStatus(StepStatus::DuringStep); } - m_currentIteration = it->first; } -SeriesIterator &SeriesIterator::operator++() +std::optional SeriesIterator::nextIterationInStep() { - if (!m_series.has_value()) + using ret_t = std::optional; + + if (m_iterationsInCurrentStep.empty()) + { + return ret_t{}; + } + m_iterationsInCurrentStep.pop_front(); + if (m_iterationsInCurrentStep.empty()) + { + return ret_t{}; + } + auto oldIterationIndex = m_currentIteration; + m_currentIteration = *m_iterationsInCurrentStep.begin(); + auto &series = m_series.value(); + + switch (series.iterationEncoding()) + { + case IterationEncoding::groupBased: + case IterationEncoding::variableBased: { + auto begin = series.iterations.find(oldIterationIndex); + auto end = begin; + ++end; + series.flush_impl( + begin, + end, + {FlushLevel::UserFlush}, + /* flushIOHandler = */ true); + + series.iterations[m_currentIteration].open(); + return {this}; + } + case IterationEncoding::fileBased: + series.iterations[m_currentIteration].open(); + series.iterations[m_currentIteration].beginStep(/* reread = */ true); + return {this}; + } + throw std::runtime_error("Unreachable!"); +} + +std::optional SeriesIterator::nextStep() +{ + // since we are in group-based iteration layout, it does not + // matter which iteration we begin a step upon + AdvanceStatus status; + Iteration::BeginStepStatus::AvailableIterations_t availableIterations; + std::tie(status, availableIterations) = + Iteration::beginStep({}, *m_series, /* reread = */ true); + + if (availableIterations.has_value() && + status != AdvanceStatus::RANDOMACCESS) + { + m_iterationsInCurrentStep = availableIterations.value(); + } + else + { + /* + * Fallback implementation: Assume that each step corresponds + * with an iteration in ascending order. + */ + auto &series = m_series.value(); + auto it = series.iterations.find(m_currentIteration); + auto itEnd = series.iterations.end(); + if (it == itEnd) + { + if (status == AdvanceStatus::RANDOMACCESS || + status == AdvanceStatus::OVER) + { + *this = end(); + return {this}; + } + else + { + /* + * Stream still going but there was no iteration found in the + * current IO step? + * Might be a duplicate iteration resulting from appending, + * will skip such iterations and hope to find something in a + * later IO step. No need to finish right now. + */ + m_iterationsInCurrentStep = {}; + m_series->advance(AdvanceMode::ENDSTEP); + } + } + else + { + ++it; + + if (it == itEnd) + { + if (status == AdvanceStatus::RANDOMACCESS || + status == AdvanceStatus::OVER) + { + *this = end(); + return {this}; + } + else + { + /* + * Stream still going but there was no iteration found in + * the current IO step? Might be a duplicate iteration + * resulting from appending, will skip such iterations and + * hope to find something in a later IO step. No need to + * finish right now. + */ + m_iterationsInCurrentStep = {}; + m_series->advance(AdvanceMode::ENDSTEP); + } + } + else + { + m_iterationsInCurrentStep = {it->first}; + } + } + } + + if (status == AdvanceStatus::OVER) { *this = end(); - return *this; + return {this}; } + + return {this}; +} + +std::optional SeriesIterator::loopBody() +{ Series &series = m_series.value(); auto &iterations = series.iterations; - auto ¤tIteration = iterations[m_currentIteration]; - if (!currentIteration.closed()) + + /* + * Might not be present because parsing might have failed in previous step + */ + if (iterations.contains(m_currentIteration)) { - currentIteration.close(); + auto ¤tIteration = iterations[m_currentIteration]; + if (!currentIteration.closed()) + { + currentIteration.close(); + } } - switch (series.iterationEncoding()) + + auto guardReturn = + [&iterations]( + auto const &option) -> std::optional { + if (!option.has_value() || *option.value() == end()) + { + return option; + } + auto currentIterationIndex = option.value()->peekCurrentIteration(); + if (!currentIterationIndex.has_value()) + { + return std::nullopt; + } + auto iteration = iterations.at(currentIterationIndex.value()); + if (iteration.get().m_closed != internal::CloseStatus::ClosedInBackend) + { + iteration.open(); + option.value()->setCurrentIteration(); + return option; + } + else + { + // we had this iteration already, skip it + iteration.endStep(); + return std::nullopt; // empty, go into next iteration + } + }; + { - using IE = IterationEncoding; - case IE::groupBased: - case IE::variableBased: { - // since we are in group-based iteration layout, it does not - // matter which iteration we begin a step upon - AdvanceStatus status{}; - status = currentIteration.beginStep(/* reread = */ true); - if (status == AdvanceStatus::OVER) + auto optionallyAStep = nextIterationInStep(); + if (optionallyAStep.has_value()) { - *this = end(); - return *this; + return guardReturn(optionallyAStep); } - currentIteration.setStepStatus(StepStatus::DuringStep); - break; - } - default: - break; } - auto it = iterations.find(m_currentIteration); - auto itEnd = iterations.end(); - if (it == itEnd) + + // The currently active iterations have been exhausted. + // Now see if there are further iterations to be found. + + if (series.iterationEncoding() == IterationEncoding::fileBased) { + // this one is handled above, stream is over once it proceeds to here *this = end(); - return *this; + return {this}; } - ++it; - if (it == itEnd) + + auto option = nextStep(); + return guardReturn(option); +} + +SeriesIterator &SeriesIterator::operator++() +{ + if (!m_series.has_value()) { *this = end(); return *this; } - m_currentIteration = it->first; - if (it->second.get().m_closed != internal::CloseStatus::ClosedInBackend) + std::optional res; + /* + * loopBody() might return an empty option to indicate a skipped iteration. + * Loop until it returns something real for us. + */ + do { - it->second.open(); - } - switch (series.iterationEncoding()) + res = loopBody(); + } while (!res.has_value()); + + auto resvalue = res.value(); + if (*resvalue != end()) { - using IE = IterationEncoding; - case IE::fileBased: { - auto &iteration = series.iterations[m_currentIteration]; - AdvanceStatus status{}; - status = iteration.beginStep(/* reread = */ true); - if (status == AdvanceStatus::OVER) - { - *this = end(); - return *this; - } - iteration.setStepStatus(StepStatus::DuringStep); - break; - } - default: - break; + (**resvalue).setStepStatus(StepStatus::DuringStep); } - return *this; + return *resvalue; } IndexedIteration SeriesIterator::operator*() diff --git a/src/Series.cpp b/src/Series.cpp index 6387258cf6..7d87e88e13 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -577,28 +577,34 @@ Given file pattern: ')END" case Access::READ_WRITE: { /* Allow creation of values in Containers and setting of Attributes * Would throw for Access::READ_ONLY */ - auto oldType = IOHandler()->m_frontendAccess; - auto newType = const_cast(&IOHandler()->m_frontendAccess); - *newType = Access::READ_WRITE; + IOHandler()->m_seriesStatus = internal::SeriesStatus::Parsing; - if (input->iterationEncoding == IterationEncoding::fileBased) - readFileBased(); - else - readGorVBased(); - - if (series.iterations.empty()) + try { - /* Access::READ_WRITE can be used to create a new Series - * allow setting attributes in that case */ - written() = false; + if (input->iterationEncoding == IterationEncoding::fileBased) + readFileBased(); + else + readGorVBased(); - initDefaults(input->iterationEncoding); - setIterationEncoding(input->iterationEncoding); + if (series.iterations.empty()) + { + /* Access::READ_WRITE can be used to create a new Series + * allow setting attributes in that case */ + written() = false; - written() = true; + initDefaults(input->iterationEncoding); + setIterationEncoding(input->iterationEncoding); + + written() = true; + } + } + catch (...) + { + IOHandler()->m_seriesStatus = internal::SeriesStatus::Default; + throw; } - *newType = oldType; + IOHandler()->m_seriesStatus = internal::SeriesStatus::Default; break; } case Access::CREATE: { @@ -732,11 +738,16 @@ void Series::flushFileBased( switch (openIterationIfDirty(it->first, it->second)) { using IO = IterationOpened; + case IO::RemainsClosed: + // we might need to proceed further if the close status is + // ClosedInFrontend + // hence no continue here + // otherwise, we might forget to close files physically + break; case IO::HasBeenOpened: + // continue below it->second.flush(flushParams); break; - case IO::RemainsClosed: - break; } // Phase 2 @@ -769,13 +780,21 @@ void Series::flushFileBased( case IO::HasBeenOpened: { /* as there is only one series, * emulate the file belonging to each iteration as not yet - * written + * written, even if the iteration itself is already written + * (to ensure that the Series gets reassociated with the + * current iteration) */ written() = false; series.iterations.written() = false; dirty() |= it->second.dirty(); std::string filename = iterationFilename(it->first); + + if (!it->second.written()) + { + series.m_currentlyActiveIterations.emplace(it->first); + } + it->second.flushFileBased(filename, it->first, flushParams); series.iterations.flush( @@ -831,11 +850,15 @@ void Series::flushGorVBased( switch (openIterationIfDirty(it->first, it->second)) { using IO = IterationOpened; + case IO::RemainsClosed: + // we might need to proceed further if the close status is + // ClosedInFrontend + // hence no continue here + break; case IO::HasBeenOpened: + // continue below it->second.flush(flushParams); break; - case IO::RemainsClosed: - break; } // Phase 2 @@ -895,6 +918,7 @@ void Series::flushGorVBased( if (!it->second.written()) { it->second.parent() = getWritable(&series.iterations); + series.m_currentlyActiveIterations.emplace(it->first); } switch (iterationEncoding()) { @@ -1115,7 +1139,7 @@ void Series::readOneIterationFileBased(std::string const &filePath) series.iterations.readAttributes(ReadMode::OverrideExisting); } -void Series::readGorVBased(bool do_init) +std::optional> Series::readGorVBased(bool do_init) { auto &series = get(); Parameter fOpen; @@ -1187,16 +1211,35 @@ void Series::readGorVBased(bool do_init) IOHandler()->enqueue(IOTask(&series.iterations, pOpen)); readAttributes(ReadMode::IgnoreExisting); + + auto withRWAccess = [this](auto &&functor) { + auto oldStatus = IOHandler()->m_seriesStatus; + IOHandler()->m_seriesStatus = internal::SeriesStatus::Parsing; + try + { + std::forward(functor)(); + } + catch (...) + { + IOHandler()->m_seriesStatus = oldStatus; + throw; + } + IOHandler()->m_seriesStatus = oldStatus; + }; + /* * 'snapshot' changes over steps, so reread that. */ - series.iterations.readAttributes(ReadMode::OverrideExisting); + withRWAccess([&series]() { + series.iterations.readAttributes(ReadMode::OverrideExisting); + }); + /* obtain all paths inside the basepath (i.e. all iterations) */ Parameter pList; IOHandler()->enqueue(IOTask(&series.iterations, pList)); IOHandler()->flush(internal::defaultFlushParams); - auto readSingleIteration = [&series, &pOpen, this]( + auto readSingleIteration = [&series, &pOpen, this, withRWAccess]( uint64_t index, std::string path, bool guardAgainstRereading, @@ -1215,7 +1258,7 @@ void Series::readGorVBased(bool do_init) { pOpen.path = path; IOHandler()->enqueue(IOTask(&i, pOpen)); - i.reread(path); + withRWAccess([&i, &path]() { i.reread(path); }); } } else @@ -1235,13 +1278,18 @@ void Series::readGorVBased(bool do_init) } }; + /* + * @todo in BP5, a BeginStep() might be necessary before this + */ + auto currentSteps = currentSnapshot(); + switch (iterationEncoding()) { case IterationEncoding::groupBased: /* * Sic! This happens when a file-based Series is opened in group-based mode. */ - case IterationEncoding::fileBased: + case IterationEncoding::fileBased: { for (auto const &it : *pList.paths) { uint64_t index = std::stoull(it); @@ -1250,23 +1298,37 @@ void Series::readGorVBased(bool do_init) * (beginStep = false) * A streaming read mode might come in a future API addition. */ - readSingleIteration(index, it, true, false); + withRWAccess( + [&]() { readSingleIteration(index, it, true, false); }); } - break; + if (currentSteps.has_value()) + { + auto const &vec = currentSteps.value(); + return std::deque{vec.begin(), vec.end()}; + } + else + { + return std::optional>(); + } + } case IterationEncoding::variableBased: { - uint64_t index = 0; - if (series.iterations.containsAttribute("snapshot")) + std::deque res = {0}; + if (currentSteps.has_value() && !currentSteps.value().empty()) { - index = series.iterations.getAttribute("snapshot").get(); + res = {currentSteps.value().begin(), currentSteps.value().end()}; } - /* - * Variable-based iteration encoding relies on steps, so parsing must - * happen after opening the first step. - */ - readSingleIteration(index, "", false, true); - break; + for (auto it : res) + { + /* + * Variable-based iteration encoding relies on steps, so parsing + * must happen after opening the first step. + */ + withRWAccess([&]() { readSingleIteration(it, "", false, true); }); + } + return res; } } + throw std::runtime_error("Unreachable!"); } void Series::readBase() @@ -1458,9 +1520,15 @@ AdvanceStatus Series::advance( * opening an iteration's file by beginning a step on it. * So, return now. */ + iteration.get().m_closed = internal::CloseStatus::ClosedInBackend; return AdvanceStatus::OK; } + if (mode == AdvanceMode::ENDSTEP) + { + flushStep(/* doFlush = */ false); + } + Parameter param; if (itData.m_closed == internal::CloseStatus::ClosedTemporarily && series.m_iterationEncoding == IterationEncoding::fileBased) @@ -1517,6 +1585,92 @@ AdvanceStatus Series::advance( return *param.status; } +AdvanceStatus Series::advance(AdvanceMode mode) +{ + auto &series = get(); + if (series.m_iterationEncoding == IterationEncoding::fileBased) + { + throw error::Internal( + "Advancing a step in file-based iteration encoding is " + "iteration-specific."); + } + internal::FlushParams const flushParams = {FlushLevel::UserFlush}; + /* + * We call flush_impl() with flushIOHandler = false, meaning that tasks are + * not yet propagated to the backend. + * We will append ADVANCE and CLOSE_FILE tasks manually and then flush the + * IOHandler manually. + * In order to avoid having those tasks automatically appended by + * flush_impl(), set CloseStatus to Open for now. + */ + + auto begin = iterations.end(); + auto end = iterations.end(); + + switch (mode) + { + case AdvanceMode::ENDSTEP: + flush_impl(begin, end, flushParams, /* flushIOHandler = */ false); + break; + case AdvanceMode::BEGINSTEP: + /* + * When beginning a step, there is nothing to flush yet. + * Data is not written in between steps. + * So only make sure that files are accessed. + */ + flush_impl( + begin, + end, + {FlushLevel::CreateOrOpenFiles}, + /* flushIOHandler = */ false); + break; + } + + if (mode == AdvanceMode::ENDSTEP) + { + flushStep(/* doFlush = */ false); + } + + Parameter param; + param.mode = mode; + IOTask task(&series.m_writable, param); + IOHandler()->enqueue(task); + + // We cannot call Series::flush now, since the IO handler is still filled + // from calling flush(Group|File)based, but has not been emptied yet + // Do that manually + IOHandler()->flush(flushParams); + + return *param.status; +} + +void Series::flushStep(bool doFlush) +{ + auto &series = get(); + if (!series.m_currentlyActiveIterations.empty() && + IOHandler()->m_frontendAccess != Access::READ_ONLY) + { + /* + * Warning: changing attribute extents over time (probably) unsupported + * by this so far. + * Not (yet) needed as there is no way to pack several iterations within + * one IO step. + */ + Parameter wAttr; + wAttr.changesOverSteps = true; + wAttr.name = "snapshot"; + wAttr.resource = std::vector{ + series.m_currentlyActiveIterations.begin(), + series.m_currentlyActiveIterations.end()}; + wAttr.dtype = Datatype::VEC_ULONGLONG; + IOHandler()->enqueue(IOTask(&series.iterations, wAttr)); + if (doFlush) + { + IOHandler()->flush(internal::defaultFlushParams); + } + } +} + auto Series::openIterationIfDirty(uint64_t index, Iteration iteration) -> IterationOpened { @@ -1795,6 +1949,7 @@ namespace internal { Series impl{{this, [](auto const *) {}}}; impl.flush(); + impl.flushStep(/* doFlush = */ true); } if (m_writeIterations.has_value()) { @@ -1886,6 +2041,46 @@ WriteIterations Series::writeIterations() return series.m_writeIterations.value(); } +std::optional> Series::currentSnapshot() const +{ + using vec_t = std::vector; + auto &series = get(); + /* + * In variable-based iteration encoding, iterations have no distinct + * group within `series.iterations`, meaning that the `snapshot` + * attribute is not found at `/data/0/snapshot`, but at + * `/data/snapshot`. This makes it possible to retrieve it from + * `series.iterations`. + */ + if (series.iterations.containsAttribute("snapshot")) + { + auto const &attribute = series.iterations.getAttribute("snapshot"); + switch (attribute.dtype) + { + case Datatype::ULONGLONG: + case Datatype::VEC_ULONGLONG: { + auto const &vec = attribute.get>(); + return vec_t{vec.begin(), vec.end()}; + } + case Datatype::ULONG: + case Datatype::VEC_ULONG: { + auto const &vec = attribute.get>(); + return vec_t{vec.begin(), vec.end()}; + } + default: { + std::stringstream s; + s << "Unexpected datatype for '/data/snapshot': " << attribute.dtype + << std::endl; + throw std::runtime_error(s.str()); + } + } + } + else + { + return std::optional>{}; + } +} + namespace { CleanedFilename cleanFilename( diff --git a/test/JSONTest.cpp b/test/JSONTest.cpp index 686306700e..485689acb4 100644 --- a/test/JSONTest.cpp +++ b/test/JSONTest.cpp @@ -1,8 +1,10 @@ #include "openPMD/auxiliary/JSON.hpp" #include "openPMD/auxiliary/JSON_internal.hpp" +#include "openPMD/openPMD.hpp" #include +#include #include using namespace openPMD; @@ -172,3 +174,88 @@ TEST_CASE("json_merging", "auxiliary") json::merge(defaultVal, overwrite) == json::parseOptions(expect, false).config.dump()); } + +/* + * This tests two things about the /data/snapshot attribute: + * + * 1) Reading a variable-based series without the snapshot attribute should be + * possible by assuming a default /data/snapshot = 0. + * 2) The snapshot attribute might be a vector of iterations. The Read API + * should then return the same iteration multiple times, with different + * indices. + * + * Such files are currently not created by the openPMD-api (the API currently + * supports creating a variable-based series with a scalar snapshot attribute). + * But the standard will allow both options above, so reading should at least + * be possible. + * This test creates a variable-based JSON series and then uses the nlohmann + * json library to modifiy the resulting series for testing purposes. + */ +TEST_CASE("variableBasedModifiedSnapshot", "[auxiliary]") +{ + constexpr auto file = "../samples/variableBasedModifiedSnapshot.json"; + { + Series writeSeries(file, Access::CREATE); + writeSeries.setIterationEncoding(IterationEncoding::variableBased); + REQUIRE( + writeSeries.iterationEncoding() == + IterationEncoding::variableBased); + auto iterations = writeSeries.writeIterations(); + auto iteration = iterations[10]; + auto E_z = iteration.meshes["E"]["x"]; + E_z.resetDataset({Datatype::INT, {1}}); + E_z.makeConstant(72); + + iteration.close(); + } + + { + nlohmann::json series; + { + std::fstream fstream; + fstream.open(file, std::ios_base::in); + fstream >> series; + } + series["data"]["attributes"].erase("snapshot"); + { + std::fstream fstream; + fstream.open(file, std::ios_base::out | std::ios_base::trunc); + fstream << series; + } + } + + /* + * Need generic capture here since the compilers are being + * annoying otherwise. + */ + auto testRead = [&](std::vector const &requiredIterations) { + Series readSeries(file, Access::READ_ONLY); + size_t counter = 0; + for (auto const &iteration : readSeries.readIterations()) + { + REQUIRE(iteration.iterationIndex == requiredIterations[counter++]); + } + REQUIRE(counter == requiredIterations.size()); + }; + testRead(std::vector{0}); + + { + nlohmann::json series; + { + std::fstream fstream; + fstream.open(file, std::ios_base::in); + fstream >> series; + } + series["data"]["attributes"].erase("snapshot"); + auto &snapshot = series["data"]["attributes"]["snapshot"]; + snapshot["datatype"] = "VEC_ULONG"; + snapshot["value"] = std::vector{1, 2, 3, 4, 5}; + { + std::fstream fstream; + fstream.open(file, std::ios_base::out | std::ios_base::trunc); + fstream << series; + } + } + + testRead(std::vector{1, 2, 3, 4, 5}); +} diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index 99cd472166..b21bcc2e97 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -1489,15 +1489,8 @@ void append_mode( else { REQUIRE(read.iterations.size() == 5); + helper::listSeries(read); } - /* - * Roadmap: for now, reading this should work by ignoring the last - * duplicate iteration. - * After merging https://github.com/openPMD/openPMD-api/pull/949, we - * should see both instances when reading. - * Final goal: Read only the last instance. - */ - helper::listSeries(read); } #if 100000000 * ADIOS2_VERSION_MAJOR + 1000000 * ADIOS2_VERSION_MINOR + \ 10000 * ADIOS2_VERSION_PATCH + 100 * ADIOS2_VERSION_TWEAK >= \ @@ -1578,10 +1571,24 @@ TEST_CASE("append_mode", "[parallel]") } } })END"; + /* + * Troublesome combination: + * 1) ADIOS2 v2.7 + * 2) Parallel writer + * 3) Append mode + * 4) Writing to a scalar variable + * + * 4) is done by schema 2021 which will be phased out, so the tests + * are just deactivated. + */ + if (auxiliary::getEnvNum("OPENPMD2_ADIOS2_SCHEMA", 0) != 0) + { + continue; + } append_mode(t, false, jsonConfigOld); - append_mode(t, false, jsonConfigNew); - append_mode(t, true, jsonConfigOld); - append_mode(t, true, jsonConfigNew); + // append_mode(t, true, jsonConfigOld); + // append_mode(t, false, jsonConfigNew); + // append_mode(t, true, jsonConfigNew); } else { diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index 61172aa5f4..ac187a5d02 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -4900,8 +4900,10 @@ void serial_iterator(std::string const &file) Series readSeries(file, Access::READ_ONLY); size_t last_iteration_index = 0; + size_t numberOfIterations = 0; for (auto iteration : readSeries.readIterations()) { + ++numberOfIterations; auto E_x = iteration.meshes["E"]["x"]; REQUIRE(E_x.getDimensionality() == 1); REQUIRE(E_x.getExtent()[0] == extent); @@ -4914,6 +4916,7 @@ void serial_iterator(std::string const &file) last_iteration_index = iteration.iterationIndex; } REQUIRE(last_iteration_index == 9); + REQUIRE(numberOfIterations == 10); } TEST_CASE("serial_iterator", "[serial][adios2]") @@ -5665,7 +5668,8 @@ void iterate_nonstreaming_series( auto E_x = iteration.meshes["E"]["x"]; E_x.resetDataset( openPMD::Dataset(openPMD::Datatype::INT, {2, extent})); - std::vector data(extent, i); + int value = variableBasedLayout ? 0 : i; + std::vector data(extent, value); E_x.storeChunk(data, {0, 0}, {1, extent}); bool taskSupportedByBackend = true; DynamicMemoryView memoryView; @@ -5753,9 +5757,10 @@ void iterate_nonstreaming_series( iteration.close(); } + int value = variableBasedLayout ? 0 : iteration.iterationIndex; for (size_t i = 0; i < extent; ++i) { - REQUIRE(chunk.get()[i] == int(iteration.iterationIndex)); + REQUIRE(chunk.get()[i] == value); REQUIRE(chunk2.get()[i] == int(i)); } last_iteration_index = iteration.iterationIndex; @@ -6175,11 +6180,12 @@ TEST_CASE("deferred_parsing", "[serial]") } } -// @todo merge this back with the chaotic_stream test of PR #949 -// (bug noticed while working on that branch) -void no_explicit_flush(std::string filename) +void chaotic_stream(std::string filename, bool variableBased) { - std::vector sampleData{5, 9, 1, 3, 4, 6, 7, 8, 2, 0}; + /* + * We will write iterations in the following order. + */ + std::vector iterations{5, 9, 1, 3, 4, 6, 7, 8, 2, 0}; std::string jsonConfig = R"( { "adios2": { @@ -6190,16 +6196,31 @@ void no_explicit_flush(std::string filename) } })"; + bool weirdOrderWhenReading{}; + { Series series(filename, Access::CREATE, jsonConfig); - for (uint64_t currentIteration = 0; currentIteration < 10; - ++currentIteration) + /* + * When using ADIOS2 steps, iterations are read not by logical order + * (iteration index), but by order of writing. + */ + weirdOrderWhenReading = series.backend() == "ADIOS2" && + series.iterationEncoding() != IterationEncoding::fileBased; + if (variableBased) + { + if (series.backend() != "ADIOS2") + { + return; + } + series.setIterationEncoding(IterationEncoding::variableBased); + } + for (auto currentIteration : iterations) { auto dataset = series.writeIterations()[currentIteration] .meshes["iterationOrder"][MeshRecordComponent::SCALAR]; dataset.resetDataset({determineDatatype(), {10}}); - dataset.storeChunk(sampleData, {0}, {10}); + dataset.storeChunk(iterations, {0}, {10}); // series.writeIterations()[ currentIteration ].close(); } } @@ -6209,19 +6230,27 @@ void no_explicit_flush(std::string filename) size_t index = 0; for (const auto &iteration : series.readIterations()) { - REQUIRE(iteration.iterationIndex == index); + if (weirdOrderWhenReading) + { + REQUIRE(iteration.iterationIndex == iterations[index]); + } + else + { + REQUIRE(iteration.iterationIndex == index); + } ++index; } - REQUIRE(index == 10); + REQUIRE(index == iterations.size()); } } -TEST_CASE("no_explicit_flush", "[serial]") +TEST_CASE("chaotic_stream", "[serial]") { for (auto const &t : testedFileExtensions()) { - no_explicit_flush("../samples/no_explicit_flush_filebased_%T." + t); - no_explicit_flush("../samples/no_explicit_flush." + t); + chaotic_stream("../samples/chaotic_stream_filebased_%T." + t, false); + chaotic_stream("../samples/chaotic_stream." + t, false); + chaotic_stream("../samples/chaotic_stream_vbased." + t, true); } } @@ -6315,9 +6344,48 @@ TEST_CASE("varying_zero_pattern", "[serial]") } } +enum class ParseMode +{ + /* + * Conventional workflow. Just parse the whole thing and yield iterations + * in rising order. + */ + NoSteps, + /* + * NOTE: This mode is only temporary until the topic-linear-read PR, + * no longer necessary after that. + * The Series is parsed ahead of time upon opening, but it has steps. + * Parsing ahead of time is the conventional workflow to support + * random-access. + * Reading such a Series with the streaming API is only possible if all + * steps are in ascending order, otherwise the openPMD-api has no way of + * associating IO steps with interation indices. + * Reading such a Series with the Streaming API will become possible with + * the Linear read mode to be introduced by #1291. + */ + AheadOfTimeWithoutSnapshot, + /* + * A Series of the BP5 engine is not parsed ahead of time, but step-by-step, + * giving the openPMD-api a way to associate IO steps with iterations. + * No snapshot attribute exists, so the fallback mode is chosen: + * Iterations are returned in ascending order. + * If an IO step returns an iteration whose index is lower than the + * last one, it will be skipped. + * This mode of parsing will be generalized into the Linear read mode with + * PR #1291. + */ + LinearWithoutSnapshot, + /* + * Snapshot attribute exists and dictates the iteration index returned by + * an IO step. Duplicate iterations will be skipped. + */ + WithSnapshot +}; + void append_mode( std::string const &extension, bool variableBased, + ParseMode parseMode, std::string jsonConfig = "{}") { @@ -6402,35 +6470,94 @@ void append_mode( } writeSomeIterations( - write.writeIterations(), std::vector{4, 3}); + write.writeIterations(), std::vector{4, 3, 10}); + write.flush(); + } + { + Series write(filename, Access::APPEND, jsonConfig); + if (variableBased) + { + write.setIterationEncoding(IterationEncoding::variableBased); + } + if (write.backend() == "ADIOS1") + { + REQUIRE_THROWS_AS( + write.flush(), error::OperationUnsupportedInBackend); + // destructor will be noisy now + return; + } + + writeSomeIterations( + write.writeIterations(), std::vector{7, 1, 11}); write.flush(); } { Series read(filename, Access::READ_ONLY); - if (variableBased || extension == "bp5") + switch (parseMode) { + case ParseMode::NoSteps: { + unsigned counter = 0; + uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 7, 10, 11}; + for (auto const &iteration : read.readIterations()) + { + REQUIRE(iteration.iterationIndex == iterationOrder[counter]); + ++counter; + } + REQUIRE(counter == 8); + } + break; + case ParseMode::LinearWithoutSnapshot: { + unsigned counter = 0; + uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 10, 11}; + for (auto const &iteration : read.readIterations()) + { + REQUIRE(iteration.iterationIndex == iterationOrder[counter]); + ++counter; + } + REQUIRE(counter == 7); + } + break; + case ParseMode::WithSnapshot: { // in variable-based encodings, iterations are not parsed ahead of // time but as they go unsigned counter = 0; + uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 10, 7, 11}; for (auto const &iteration : read.readIterations()) { - REQUIRE(iteration.iterationIndex == counter); + REQUIRE(iteration.iterationIndex == iterationOrder[counter]); ++counter; } - REQUIRE(counter == 5); + REQUIRE(counter == 8); + // Cannot do listSeries here because the Series is already drained + REQUIRE_THROWS_AS(helper::listSeries(read), error::WrongAPIUsage); } - else - { - REQUIRE(read.iterations.size() == 5); + break; + case ParseMode::AheadOfTimeWithoutSnapshot: { + REQUIRE(read.iterations.size() == 8); + unsigned counter = 0; + uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 7, 10, 11}; + /* + * Use conventional read API since streaming API is not possible + * without Linear read mode. + * (See also comments inside ParseMode enum). + */ + for (auto const &iteration : read.iterations) + { + REQUIRE(iteration.first == iterationOrder[counter]); + ++counter; + } + REQUIRE(counter == 8); + /* + * Roadmap: for now, reading this should work by ignoring the last + * duplicate iteration. + * After merging https://github.com/openPMD/openPMD-api/pull/949, we + * should see both instances when reading. + * Final goal: Read only the last instance. + */ + helper::listSeries(read); + } + break; } - /* - * Roadmap: for now, reading this should work by ignoring the last - * duplicate iteration. - * After merging https://github.com/openPMD/openPMD-api/pull/949, we - * should see both instances when reading. - * Final goal: Read only the last instance. - */ - helper::listSeries(read); } #if 100000000 * ADIOS2_VERSION_MAJOR + 1000000 * ADIOS2_VERSION_MINOR + \ 10000 * ADIOS2_VERSION_PATCH + 100 * ADIOS2_VERSION_TWEAK >= \ @@ -6467,16 +6594,47 @@ void append_mode( } { Series read(filename, Access::READ_ONLY); - // in variable-based encodings, iterations are not parsed ahead of - // time but as they go - unsigned counter = 0; - for (auto const &iteration : read.readIterations()) + switch (parseMode) { - REQUIRE(iteration.iterationIndex == counter); - ++counter; + case ParseMode::LinearWithoutSnapshot: { + uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 10}; + unsigned counter = 0; + for (auto const &iteration : read.readIterations()) + { + REQUIRE( + iteration.iterationIndex == iterationOrder[counter]); + ++counter; + } + REQUIRE(counter == 6); + // Cannot do listSeries here because the Series is already + // drained + REQUIRE_THROWS_AS( + helper::listSeries(read), error::WrongAPIUsage); + } + break; + case ParseMode::WithSnapshot: { + // in variable-based encodings, iterations are not parsed ahead + // of time but as they go + unsigned counter = 0; + uint64_t iterationOrder[] = {0, 1, 2, 3, 4, 10, 7, 5}; + for (auto const &iteration : read.readIterations()) + { + REQUIRE( + iteration.iterationIndex == iterationOrder[counter]); + ++counter; + } + REQUIRE(counter == 8); + // Cannot do listSeries here because the Series is already + // drained + REQUIRE_THROWS_AS( + helper::listSeries(read), error::WrongAPIUsage); + } + break; + case ParseMode::NoSteps: + case ParseMode::AheadOfTimeWithoutSnapshot: + throw std::runtime_error("Test configured wrong."); + break; } - REQUIRE(counter == 6); - helper::listSeries(read); } } #endif @@ -6486,9 +6644,7 @@ TEST_CASE("append_mode", "[serial]") { for (auto const &t : testedFileExtensions()) { - if (t == "bp" || t == "bp4" || t == "bp5") - { - std::string jsonConfigOld = R"END( + std::string jsonConfigOld = R"END( { "adios2": { @@ -6499,7 +6655,7 @@ TEST_CASE("append_mode", "[serial]") } } })END"; - std::string jsonConfigNew = R"END( + std::string jsonConfigNew = R"END( { "adios2": { @@ -6510,14 +6666,25 @@ TEST_CASE("append_mode", "[serial]") } } })END"; - append_mode(t, false, jsonConfigOld); - append_mode(t, false, jsonConfigNew); - append_mode(t, true, jsonConfigOld); - append_mode(t, true, jsonConfigNew); + if (t == "bp5") + { + append_mode( + t, false, ParseMode::LinearWithoutSnapshot, jsonConfigOld); + append_mode(t, false, ParseMode::WithSnapshot, jsonConfigNew); + append_mode(t, true, ParseMode::WithSnapshot, jsonConfigOld); + append_mode(t, true, ParseMode::WithSnapshot, jsonConfigNew); + } + else if (t == "bp" || t == "bp4" || t == "bp5") + { + append_mode( + t, false, ParseMode::AheadOfTimeWithoutSnapshot, jsonConfigOld); + append_mode(t, false, ParseMode::WithSnapshot, jsonConfigNew); + append_mode(t, true, ParseMode::WithSnapshot, jsonConfigOld); + append_mode(t, true, ParseMode::WithSnapshot, jsonConfigNew); } else { - append_mode(t, false); + append_mode(t, false, ParseMode::NoSteps); } } } diff --git a/test/python/unittest/API/APITest.py b/test/python/unittest/API/APITest.py index 2503e8c619..93093626a4 100644 --- a/test/python/unittest/API/APITest.py +++ b/test/python/unittest/API/APITest.py @@ -1067,8 +1067,9 @@ def testListSeries(self): series = self.__series self.assertRaises(TypeError, io.list_series) io.list_series(series) - io.list_series(series, False) - io.list_series(series, True) + # @todo make list_series callable repeatedly + # io.list_series(series, False) + # io.list_series(series, True) print(io.list_series.__doc__)