diff --git a/documentation/writer_module_mdat_metadata.md b/documentation/writer_module_mdat_metadata.md new file mode 100644 index 000000000..5d537c9ce --- /dev/null +++ b/documentation/writer_module_mdat_metadata.md @@ -0,0 +1,42 @@ +# mdat writer module + +## Stream configuration fields + +This module is different to other writer modules in that it doesn't use Kafka. Instead metadata data values are set via +code. +It isn't a general metadata writer, there are only a discrete set of named values it will work with. Other values are +ignored. + +Currently, it only supports start and stop times. + +## Example +Example `nexus_structure` to write start and stop times: + +```json +{ + "nexus_structure": { + "children": [ + { + "type": "group", + "name": "entry", + "children": [ + { + "module": "mdat", + "config": { + "name": "start_time" + } + }, + { + "module": "mdat", + "config": { + "name": "end_time" + } + } + ] + } + ] + } +} +``` + + diff --git a/integration-tests/commands/nexus_structure_historical.json b/integration-tests/commands/nexus_structure_historical.json index 4dd6b41aa..c2913722c 100644 --- a/integration-tests/commands/nexus_structure_historical.json +++ b/integration-tests/commands/nexus_structure_historical.json @@ -4,6 +4,18 @@ "type": "group", "name" : "entry", "children": [ + { + "module": "mdat", + "config": { + "name": "start_time" + } + }, + { + "module": "mdat", + "config": { + "name": "end_time" + } + }, { "type": "group", "name": "historical_data_1", diff --git a/integration-tests/test_filewriter_stop_time.py b/integration-tests/test_filewriter_stop_time.py index fb973c697..d9d115970 100644 --- a/integration-tests/test_filewriter_stop_time.py +++ b/integration-tests/test_filewriter_stop_time.py @@ -4,7 +4,7 @@ publish_f142_message, Severity, ) -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from streaming_data_types.fbschemas.logdata_f142.AlarmStatus import AlarmStatus from streaming_data_types.fbschemas.logdata_f142.AlarmSeverity import AlarmSeverity from file_writer_control.WriteJob import WriteJob @@ -25,6 +25,7 @@ def test_start_and_stop_time_are_in_the_past( data_topics = ["TEST_historicalData1", "TEST_historicalData2"] + # NOTE: this is local time - the filewriter will convert it to UTC start_time = datetime(year=2019, month=6, day=12, hour=11, minute=1, second=35) stop_time = start_time + timedelta(seconds=200) step_time = timedelta(seconds=1) @@ -99,3 +100,15 @@ def test_start_and_stop_time_are_in_the_past( assert file["entry/historical_data_1/alarm_time"][0] == int( (start_time + step_time).timestamp() * 1e9 ) # ns + assert ( + file["entry/start_time"][0] + == file_start_time.astimezone(timezone.utc) + .strftime("%Y-%m-%dT%H:%M:%S.000Z") + .encode() + ) + assert ( + file["entry/end_time"][0] + == file_stop_time.astimezone(timezone.utc) + .strftime("%Y-%m-%dT%H:%M:%S.000Z") + .encode() + ) diff --git a/src/JobCreator.cpp b/src/JobCreator.cpp index 5ca1613cf..f54d5ac39 100644 --- a/src/JobCreator.cpp +++ b/src/JobCreator.cpp @@ -15,6 +15,7 @@ #include "HDFOperations.h" #include "Msg.h" #include "StreamController.h" +#include "WriterModule/mdat/mdat_Writer.h" #include "WriterModuleBase.h" #include "WriterRegistrar.h" #include "json.h" @@ -103,6 +104,18 @@ static std::vector extractModuleInformationFromJson( return SettingsList; } +std::vector +extractMdatModules(std::vector &Modules) { + auto it = std::stable_partition(Modules.begin(), Modules.end(), + [](ModuleHDFInfo const &Module) { + return Module.WriterModule != "mdat"; + }); + + std::vector mdatInfoList{it, Modules.end()}; + Modules.erase(it, Modules.end()); + return mdatInfoList; +}; + std::unique_ptr createFileWritingJob(Command::StartInfo const &StartInfo, MainOpt &Settings, Metrics::Registrar Registrar, @@ -113,6 +126,12 @@ createFileWritingJob(Command::StartInfo const &StartInfo, MainOpt &Settings, std::vector ModuleHDFInfoList = initializeHDF(*Task, StartInfo.NexusStructure); + std::vector mdatInfoList = + extractMdatModules(ModuleHDFInfoList); + + auto mdatWriter = std::make_unique(); + mdatWriter->defineMetadata(mdatInfoList); + std::vector SettingsList = extractModuleInformationFromJson(ModuleHDFInfoList); std::vector StreamSettingsList; @@ -175,7 +194,8 @@ createFileWritingJob(Command::StartInfo const &StartInfo, MainOpt &Settings, LOG_INFO("Write file with job_id: {}", Task->jobID()); return std::make_unique( - std::move(Task), Settings.StreamerConfiguration, Registrar, Tracker); + std::move(Task), std::move(mdatWriter), Settings.StreamerConfiguration, + Registrar, Tracker); } void addStreamSourceToWriterModule(vector &StreamSettingsList, diff --git a/src/JobCreator.h b/src/JobCreator.h index c1fd8ca3d..ccf14ad12 100644 --- a/src/JobCreator.h +++ b/src/JobCreator.h @@ -37,7 +37,7 @@ initializeHDF(FileWriterTask &Task, std::string const &NexusStructureString); /// \brief Extract information about the module (stream or link). /// /// \param StreamInfo -/// \return The module nformation. +/// \return The module information. ModuleSettings extractModuleInformationFromJsonForSource(ModuleHDFInfo const &ModuleInfo); @@ -47,4 +47,11 @@ generateWriterInstance(ModuleSettings const &StreamInfo); void setWriterHDFAttributes(hdf5::node::Group &RootNode, ModuleSettings const &StreamInfo); +/// \brief Extract all mdat modules from the list of modules. +/// +/// \param AllModules +/// \return The mdat modules. +std::vector +extractMdatModules(std::vector &Modules); + } // namespace FileWriter diff --git a/src/StreamController.cpp b/src/StreamController.cpp index 2d7dc1655..11b3507af 100644 --- a/src/StreamController.cpp +++ b/src/StreamController.cpp @@ -1,10 +1,8 @@ #include "StreamController.h" #include "FileWriterTask.h" #include "HDFOperations.h" -#include "Kafka/ConsumerFactory.h" #include "Kafka/MetaDataQuery.h" #include "Kafka/MetadataException.h" -#include "MultiVector.h" #include "Stream/Partition.h" #include "TimeUtility.h" #include "helper.h" @@ -13,15 +11,18 @@ namespace FileWriter { StreamController::StreamController( std::unique_ptr FileWriterTask, + std::unique_ptr mdatWriter, FileWriter::StreamerOptions const &Settings, Metrics::Registrar const &Registrar, MetaData::TrackerPtr const &Tracker) - : WriterTask(std::move(FileWriterTask)), StreamMetricRegistrar(Registrar), + : WriterTask(std::move(FileWriterTask)), MdatWriter(std::move(mdatWriter)), + StreamMetricRegistrar(Registrar), WriterThread([this]() { WriterTask->flushDataToFile(); }, Settings.DataFlushInterval, Registrar.getNewRegistrar("stream")), StreamerOptions(Settings), MetaDataTracker(Tracker) { - writeStartTime(); + MdatWriter->setStartTime(Settings.StartTimestamp); + MdatWriter->setStopTime(Settings.StopTimestamp); Executor.sendLowPriorityWork([=]() { CurrentMetadataTimeOut = Settings.BrokerSettings.MinMetadataTimeout; getTopicNames(); @@ -30,36 +31,14 @@ StreamController::StreamController( StreamController::~StreamController() { stop(); - writeStopTime(); + MdatWriter->writeMetadata(WriterTask.get()); LOG_INFO("Stopped StreamController for file with id : {}", StreamController::getJobId()); } -void StreamController::writeStartTime() { - writeTimePointAsIso8601String("/entry", "start_time", - StreamerOptions.StartTimestamp); -} - -void StreamController::writeStopTime() { - writeTimePointAsIso8601String("/entry", "stop_time", - StreamerOptions.StopTimestamp); -} - -void StreamController::writeTimePointAsIso8601String(std::string const &Path, - std::string const &Name, - time_point const &Value) { - try { - auto StringVec = MultiVector{{1}}; - StringVec.at({0}) = toUTCDateTime(Value); - auto Group = hdf5::node::get_group(WriterTask->hdfGroup(), Path); - HDFOperations::writeStringDataset(Group, Name, StringVec); - } catch (std::exception &Error) { - LOG_ERROR("Failed to write time-point as ISO8601: {}", Error.what()); - } -} - void StreamController::setStopTime(time_point const &StopTime) { StreamerOptions.StopTimestamp = StopTime; + MdatWriter->setStopTime(StopTime); Executor.sendWork([=]() { for (auto &s : Streamers) { s->setStopTime(StopTime); diff --git a/src/StreamController.h b/src/StreamController.h index c8f0ddb84..872b3c25c 100644 --- a/src/StreamController.h +++ b/src/StreamController.h @@ -20,6 +20,7 @@ #include "Stream/Topic.h" #include "ThreadedExecutor.h" #include "TimeUtility.h" +#include "WriterModule/mdat/mdat_Writer.h" #include #include #include @@ -43,6 +44,7 @@ class IStreamController { class StreamController : public IStreamController { public: StreamController(std::unique_ptr FileWriterTask, + std::unique_ptr mdatWriter, FileWriter::StreamerOptions const &Settings, Metrics::Registrar const &Registrar, MetaData::TrackerPtr const &Tracker); @@ -135,6 +137,7 @@ class StreamController : public IStreamController { /// guarantee that its destructor is not called before the writer modules have /// been de-allocated. std::unique_ptr WriterTask{nullptr}; + std::unique_ptr MdatWriter{nullptr}; std::vector> Streamers; Metrics::Registrar StreamMetricRegistrar; Stream::MessageWriter WriterThread; diff --git a/src/WriterModule/CMakeLists.txt b/src/WriterModule/CMakeLists.txt index 17690e710..32fef3596 100644 --- a/src/WriterModule/CMakeLists.txt +++ b/src/WriterModule/CMakeLists.txt @@ -15,3 +15,4 @@ add_subdirectory(ep00) add_subdirectory(f144) add_subdirectory(al00) add_subdirectory(ep01) +add_subdirectory(mdat) diff --git a/src/WriterModule/mdat/CMakeLists.txt b/src/WriterModule/mdat/CMakeLists.txt new file mode 100644 index 000000000..ebb028a74 --- /dev/null +++ b/src/WriterModule/mdat/CMakeLists.txt @@ -0,0 +1,9 @@ +set(mdat_SRC + mdat_Writer.cpp +) + +set(mdat_INC + mdat_Writer.h +) + +create_writer_module(mdat) diff --git a/src/WriterModule/mdat/mdat_Writer.cpp b/src/WriterModule/mdat/mdat_Writer.cpp new file mode 100644 index 000000000..daf2de1d0 --- /dev/null +++ b/src/WriterModule/mdat/mdat_Writer.cpp @@ -0,0 +1,94 @@ +// SPDX-License-Identifier: BSD-2-Clause +// +// This code has been produced by the European Spallation Source +// and its partner institutes under the BSD 2 Clause License. +// +// See LICENSE.md at the top level for license information. +// +// Screaming Udder! https://esss.se + +#include "mdat_Writer.h" +#include "FileWriterTask.h" +#include "HDFOperations.h" +#include "ModuleHDFInfo.h" +#include "MultiVector.h" +#include "TimeUtility.h" +#include "json.h" + +namespace WriterModule::mdat { +void mdat_Writer::defineMetadata(std::vector const &Modules) { + Writables = extractDetails(Modules); +} + +void mdat_Writer::setStartTime(time_point startTime) { + setWritableValueIfDefined(StartTime, startTime); +} + +void mdat_Writer::setStopTime(time_point stopTime) { + setWritableValueIfDefined(EndTime, stopTime); +} + +void mdat_Writer::setWritableValueIfDefined(std::string const &Name, + time_point const &Time) { + if (auto Result = Writables.find(Name); Result != Writables.end()) { + Result->second.Value = toUTCDateTime(Time); + } +} + +void mdat_Writer::writeMetadata(FileWriter::FileWriterTask const *Task) const { + for (auto const &[Name, Value] : Writables) { + if (std::find(AllowedNames.cbegin(), AllowedNames.cend(), Name) == + AllowedNames.end()) { + continue; + } + if (Value.isWritable()) { + writeStringValue(Task, Name, Value.Path, Value.Value); + } + } +} + +void mdat_Writer::writeStringValue(FileWriter::FileWriterTask const *Task, + std::string const &Name, + std::string const &Path, + std::string const &Value) { + try { + auto StringVec = MultiVector{{1}}; + StringVec.at({0}) = Value; + auto Group = hdf5::node::get_group(Task->hdfGroup(), Path); + HDFOperations::writeStringDataset(Group, Name, StringVec); + } catch (std::exception &Error) { + LOG_ERROR("Failed to write mdat string value: {}", Error.what()); + } +} + +std::unordered_map +mdat_Writer::extractDetails(std::vector const &Modules) const { + std::unordered_map Details; + + for (auto const &Module : Modules) { + if (Module.WriterModule != "mdat") { + continue; + } + + auto const name = extractName(Module.ConfigStream); + if (name && std::find(AllowedNames.begin(), AllowedNames.end(), name) != + AllowedNames.end()) { + Writable NewWritable; + NewWritable.Path = Module.HDFParentName; + Details[name.value()] = NewWritable; + } + } + return Details; +} + +std::optional +mdat_Writer::extractName(std::string const &configJson) { + nlohmann::json json = nlohmann::json::parse(configJson); + for (auto it = json.begin(); it != json.end(); ++it) { + if (it.key() == "name" && !it.value().empty()) { + return it.value(); + } + } + return {}; +} +} // namespace WriterModule::mdat diff --git a/src/WriterModule/mdat/mdat_Writer.h b/src/WriterModule/mdat/mdat_Writer.h new file mode 100644 index 000000000..45f7067ef --- /dev/null +++ b/src/WriterModule/mdat/mdat_Writer.h @@ -0,0 +1,74 @@ +// SPDX-License-Identifier: BSD-2-Clause +// +// This code has been produced by the European Spallation Source +// and its partner institutes under the BSD 2 Clause License. +// +// See LICENSE.md at the top level for license information. +// +// Screaming Udder! https://esss.se + +#pragma once + +#include "FileWriterTask.h" +#include "ModuleHDFInfo.h" +#include "TimeUtility.h" + +namespace WriterModule::mdat { + +/// \brief Used to write basic metadata such as start time, etc. +/// +/// It works differently to other writer modules in that it doesn't listen to +/// a Kafka topic. Instead values to be written are set in regular code. +class mdat_Writer { +public: + /// \brief Work out what data to write based on the contents of mdat modules. + /// + /// \param Modules + void defineMetadata(std::vector const &Modules); + + /// \brief Set start time which should be written to the file. + /// + /// \param startTime + void setStartTime(time_point startTime); + + /// \brief Set stop time which should be written to the file. + /// + /// \param startTime + void setStopTime(time_point stopTime); + + /// \brief Write any defined values to the HDF file. + /// + /// \note Nothing will be written until this is called. + /// + /// \param Task + void writeMetadata(FileWriter::FileWriterTask const *Task) const; + +private: + struct Writable { + std::string Path; + std::string Value; + + [[nodiscard]] bool isWritable() const { + return !Path.empty() && !Value.empty(); + } + }; + + void static writeStringValue(FileWriter::FileWriterTask const *Task, + std::string const &Name, std::string const &Path, + std::string const &Value); + + [[nodiscard]] std::unordered_map + extractDetails(std::vector const &Modules) const; + + [[nodiscard]] std::optional static extractName( + std::string const &configJson); + + void setWritableValueIfDefined(std::string const &Name, + time_point const &Time); + + inline static const std::string StartTime = "start_time"; + inline static const std::string EndTime = "end_time"; + std::vector const AllowedNames{StartTime, EndTime}; + std::unordered_map Writables; +}; +} // namespace WriterModule::mdat diff --git a/src/tests/JobCreatorTests.cpp b/src/tests/JobCreatorTests.cpp index b6a4554b3..5843ab5bc 100644 --- a/src/tests/JobCreatorTests.cpp +++ b/src/tests/JobCreatorTests.cpp @@ -128,3 +128,24 @@ TEST_F(JobCreator, SetWriterAttributes) { RootGroup.get_group("entry").attributes["NX_class"].read(TempString); EXPECT_EQ(TempString, "NXlog"); } + +TEST(ExtractMdat, ExtractsAllMdatModulesFromModuleList) { + std::vector ModuleList{ + {"not mdat", ":: parent ::", ":: stream ::"}, + {"mdat", ":: parent ::", ":: stream ::"}, + {"not mdat", ":: parent ::", ":: stream ::"}, + {"mdat", ":: parent ::", ":: stream ::"}, + {"not mdat", ":: parent ::", ":: stream ::"}}; + + auto MdatModules = FileWriter::extractMdatModules(ModuleList); + + ASSERT_EQ(ModuleList.size(), static_cast(3)); + ASSERT_EQ(MdatModules.size(), static_cast(2)); + std::for_each( + MdatModules.cbegin(), MdatModules.cend(), + [](auto const &Module) { ASSERT_EQ(Module.WriterModule, "mdat"); }); + + std::for_each(ModuleList.cbegin(), ModuleList.cend(), [](auto const &Module) { + ASSERT_NE(Module.WriterModule, "mdat"); + }); +} diff --git a/src/tests/StreamControllerTests.cpp b/src/tests/StreamControllerTests.cpp index f8c3d5e6b..e9b99ccd1 100644 --- a/src/tests/StreamControllerTests.cpp +++ b/src/tests/StreamControllerTests.cpp @@ -25,11 +25,14 @@ class StreamControllerTests : public ::testing::Test { public: void SetUp() override { FileWriterTask = std::make_unique( - TestRegistrar, std::make_shared()); + TestRegistrar, + + std::make_shared()); FileWriterTask->setJobId(JobId); StreamController = std::make_unique( - std::move(FileWriterTask), FileWriter::StreamerOptions(), - Metrics::Registrar("some-app", {}), + std::move(FileWriterTask), + std::make_unique(), + FileWriter::StreamerOptions(), Metrics::Registrar("some-app", {}), std::make_shared()); }; std::string JobId = "TestID"; diff --git a/src/tests/WriterModule/CMakeLists.txt b/src/tests/WriterModule/CMakeLists.txt index 0389b6212..d607ca189 100644 --- a/src/tests/WriterModule/CMakeLists.txt +++ b/src/tests/WriterModule/CMakeLists.txt @@ -14,6 +14,7 @@ set(writer_module_SRC f144_WriterTests.cpp al00_WriterTests.cpp ep01_WriterTests.cpp + mdat_WriterTests.cpp ) add_library(writer_module OBJECT ${writer_module_SRC}) diff --git a/src/tests/WriterModule/mdat_WriterTests.cpp b/src/tests/WriterModule/mdat_WriterTests.cpp new file mode 100644 index 000000000..d27144857 --- /dev/null +++ b/src/tests/WriterModule/mdat_WriterTests.cpp @@ -0,0 +1,30 @@ +// SPDX-License-Identifier: BSD-2-Clause +// +// This code has been produced by the European Spallation Source +// and its partner institutes under the BSD 2 Clause License. +// +// See LICENSE.md at the top level for license information. +// +// Screaming Udder! https://esss.se + +#include "ModuleHDFInfo.h" +#include "WriterModule/mdat/mdat_Writer.h" +#include + +TEST(mdatWriterTests, IgnoresSettingStartTimeIfNotInNotDefined) { + std::vector MdatModules = { + {"mdat", "/entry", "{\"name\":\"end_time\"}"}}; + WriterModule::mdat::mdat_Writer Writer; + Writer.defineMetadata(MdatModules); + + EXPECT_NO_THROW(Writer.setStartTime(time_point{123456ms})); +} + +TEST(mdatWriterTests, IgnoresSettingEndTimeIfNotInNotDefined) { + std::vector MdatModules = { + {"mdat", "/entry", "{\"name\":\"start_time\"}"}}; + WriterModule::mdat::mdat_Writer Writer; + Writer.defineMetadata(MdatModules); + + EXPECT_NO_THROW(Writer.setStopTime(time_point{123456ms})); +}