Skip to content

Commit

Permalink
Adding mdat back (#716)
Browse files Browse the repository at this point in the history
* added function (+ tests) to extract mdat modules

* added mdat so we can write start and end times

* moved code into cpp file

* added doc and checks to stop time tests

* clang formatting

* improved error message

* more clang format

* black formatting

* cppcheck warnings fixed

* fix whoopsie

* clang formatting

* fixed test code

* intergration test with isoformat

* use f strings to make it tidier

* add milliseconds to expected output

* need to compare against byte string

* black formatting

* start and end time checks need to be utc

* urggh timestamp formatting

* urggh timestamp formatting...

* oops should be end_time in json too

* update docs

* add debug message

* write stop time even on historical jobs

* Update mdat_Writer.cpp

missing newline

* json example was malformed

* Simplified by introducing a private class

* removed duplication
  • Loading branch information
mattclarke authored Nov 8, 2023
1 parent a411421 commit b216d06
Show file tree
Hide file tree
Showing 15 changed files with 343 additions and 34 deletions.
42 changes: 42 additions & 0 deletions documentation/writer_module_mdat_metadata.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# mdat writer module

## Stream configuration fields

This module is different to other writer modules in that it doesn't use Kafka. Instead metadata data values are set via
code.
It isn't a general metadata writer, there are only a discrete set of named values it will work with. Other values are
ignored.

Currently, it only supports start and stop times.

## Example
Example `nexus_structure` to write start and stop times:

```json
{
"nexus_structure": {
"children": [
{
"type": "group",
"name": "entry",
"children": [
{
"module": "mdat",
"config": {
"name": "start_time"
}
},
{
"module": "mdat",
"config": {
"name": "end_time"
}
}
]
}
]
}
}
```


12 changes: 12 additions & 0 deletions integration-tests/commands/nexus_structure_historical.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@
"type": "group",
"name" : "entry",
"children": [
{
"module": "mdat",
"config": {
"name": "start_time"
}
},
{
"module": "mdat",
"config": {
"name": "end_time"
}
},
{
"type": "group",
"name": "historical_data_1",
Expand Down
15 changes: 14 additions & 1 deletion integration-tests/test_filewriter_stop_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
publish_f142_message,
Severity,
)
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from streaming_data_types.fbschemas.logdata_f142.AlarmStatus import AlarmStatus
from streaming_data_types.fbschemas.logdata_f142.AlarmSeverity import AlarmSeverity
from file_writer_control.WriteJob import WriteJob
Expand All @@ -25,6 +25,7 @@ def test_start_and_stop_time_are_in_the_past(

data_topics = ["TEST_historicalData1", "TEST_historicalData2"]

# NOTE: this is local time - the filewriter will convert it to UTC
start_time = datetime(year=2019, month=6, day=12, hour=11, minute=1, second=35)
stop_time = start_time + timedelta(seconds=200)
step_time = timedelta(seconds=1)
Expand Down Expand Up @@ -99,3 +100,15 @@ def test_start_and_stop_time_are_in_the_past(
assert file["entry/historical_data_1/alarm_time"][0] == int(
(start_time + step_time).timestamp() * 1e9
) # ns
assert (
file["entry/start_time"][0]
== file_start_time.astimezone(timezone.utc)
.strftime("%Y-%m-%dT%H:%M:%S.000Z")
.encode()
)
assert (
file["entry/end_time"][0]
== file_stop_time.astimezone(timezone.utc)
.strftime("%Y-%m-%dT%H:%M:%S.000Z")
.encode()
)
22 changes: 21 additions & 1 deletion src/JobCreator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "HDFOperations.h"
#include "Msg.h"
#include "StreamController.h"
#include "WriterModule/mdat/mdat_Writer.h"
#include "WriterModuleBase.h"
#include "WriterRegistrar.h"
#include "json.h"
Expand Down Expand Up @@ -103,6 +104,18 @@ static std::vector<ModuleSettings> extractModuleInformationFromJson(
return SettingsList;
}

std::vector<ModuleHDFInfo>
extractMdatModules(std::vector<ModuleHDFInfo> &Modules) {
auto it = std::stable_partition(Modules.begin(), Modules.end(),
[](ModuleHDFInfo const &Module) {
return Module.WriterModule != "mdat";
});

std::vector<ModuleHDFInfo> mdatInfoList{it, Modules.end()};
Modules.erase(it, Modules.end());
return mdatInfoList;
};

std::unique_ptr<IStreamController>
createFileWritingJob(Command::StartInfo const &StartInfo, MainOpt &Settings,
Metrics::Registrar Registrar,
Expand All @@ -113,6 +126,12 @@ createFileWritingJob(Command::StartInfo const &StartInfo, MainOpt &Settings,

std::vector<ModuleHDFInfo> ModuleHDFInfoList =
initializeHDF(*Task, StartInfo.NexusStructure);
std::vector<ModuleHDFInfo> mdatInfoList =
extractMdatModules(ModuleHDFInfoList);

auto mdatWriter = std::make_unique<WriterModule::mdat::mdat_Writer>();
mdatWriter->defineMetadata(mdatInfoList);

std::vector<ModuleSettings> SettingsList =
extractModuleInformationFromJson(ModuleHDFInfoList);
std::vector<ModuleSettings> StreamSettingsList;
Expand Down Expand Up @@ -175,7 +194,8 @@ createFileWritingJob(Command::StartInfo const &StartInfo, MainOpt &Settings,
LOG_INFO("Write file with job_id: {}", Task->jobID());

return std::make_unique<StreamController>(
std::move(Task), Settings.StreamerConfiguration, Registrar, Tracker);
std::move(Task), std::move(mdatWriter), Settings.StreamerConfiguration,
Registrar, Tracker);
}

void addStreamSourceToWriterModule(vector<ModuleSettings> &StreamSettingsList,
Expand Down
9 changes: 8 additions & 1 deletion src/JobCreator.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ initializeHDF(FileWriterTask &Task, std::string const &NexusStructureString);
/// \brief Extract information about the module (stream or link).
///
/// \param StreamInfo
/// \return The module nformation.
/// \return The module information.
ModuleSettings
extractModuleInformationFromJsonForSource(ModuleHDFInfo const &ModuleInfo);

Expand All @@ -47,4 +47,11 @@ generateWriterInstance(ModuleSettings const &StreamInfo);
void setWriterHDFAttributes(hdf5::node::Group &RootNode,
ModuleSettings const &StreamInfo);

/// \brief Extract all mdat modules from the list of modules.
///
/// \param AllModules
/// \return The mdat modules.
std::vector<ModuleHDFInfo>
extractMdatModules(std::vector<ModuleHDFInfo> &Modules);

} // namespace FileWriter
35 changes: 7 additions & 28 deletions src/StreamController.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#include "StreamController.h"
#include "FileWriterTask.h"
#include "HDFOperations.h"
#include "Kafka/ConsumerFactory.h"
#include "Kafka/MetaDataQuery.h"
#include "Kafka/MetadataException.h"
#include "MultiVector.h"
#include "Stream/Partition.h"
#include "TimeUtility.h"
#include "helper.h"
Expand All @@ -13,15 +11,18 @@
namespace FileWriter {
StreamController::StreamController(
std::unique_ptr<FileWriterTask> FileWriterTask,
std::unique_ptr<WriterModule::mdat::mdat_Writer> mdatWriter,
FileWriter::StreamerOptions const &Settings,
Metrics::Registrar const &Registrar, MetaData::TrackerPtr const &Tracker)

: WriterTask(std::move(FileWriterTask)), StreamMetricRegistrar(Registrar),
: WriterTask(std::move(FileWriterTask)), MdatWriter(std::move(mdatWriter)),
StreamMetricRegistrar(Registrar),
WriterThread([this]() { WriterTask->flushDataToFile(); },
Settings.DataFlushInterval,
Registrar.getNewRegistrar("stream")),
StreamerOptions(Settings), MetaDataTracker(Tracker) {
writeStartTime();
MdatWriter->setStartTime(Settings.StartTimestamp);
MdatWriter->setStopTime(Settings.StopTimestamp);
Executor.sendLowPriorityWork([=]() {
CurrentMetadataTimeOut = Settings.BrokerSettings.MinMetadataTimeout;
getTopicNames();
Expand All @@ -30,36 +31,14 @@ StreamController::StreamController(

StreamController::~StreamController() {
stop();
writeStopTime();
MdatWriter->writeMetadata(WriterTask.get());
LOG_INFO("Stopped StreamController for file with id : {}",
StreamController::getJobId());
}

void StreamController::writeStartTime() {
writeTimePointAsIso8601String("/entry", "start_time",
StreamerOptions.StartTimestamp);
}

void StreamController::writeStopTime() {
writeTimePointAsIso8601String("/entry", "stop_time",
StreamerOptions.StopTimestamp);
}

void StreamController::writeTimePointAsIso8601String(std::string const &Path,
std::string const &Name,
time_point const &Value) {
try {
auto StringVec = MultiVector<std::string>{{1}};
StringVec.at({0}) = toUTCDateTime(Value);
auto Group = hdf5::node::get_group(WriterTask->hdfGroup(), Path);
HDFOperations::writeStringDataset(Group, Name, StringVec);
} catch (std::exception &Error) {
LOG_ERROR("Failed to write time-point as ISO8601: {}", Error.what());
}
}

void StreamController::setStopTime(time_point const &StopTime) {
StreamerOptions.StopTimestamp = StopTime;
MdatWriter->setStopTime(StopTime);
Executor.sendWork([=]() {
for (auto &s : Streamers) {
s->setStopTime(StopTime);
Expand Down
3 changes: 3 additions & 0 deletions src/StreamController.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "Stream/Topic.h"
#include "ThreadedExecutor.h"
#include "TimeUtility.h"
#include "WriterModule/mdat/mdat_Writer.h"
#include <atomic>
#include <set>
#include <vector>
Expand All @@ -43,6 +44,7 @@ class IStreamController {
class StreamController : public IStreamController {
public:
StreamController(std::unique_ptr<FileWriterTask> FileWriterTask,
std::unique_ptr<WriterModule::mdat::mdat_Writer> mdatWriter,
FileWriter::StreamerOptions const &Settings,
Metrics::Registrar const &Registrar,
MetaData::TrackerPtr const &Tracker);
Expand Down Expand Up @@ -135,6 +137,7 @@ class StreamController : public IStreamController {
/// guarantee that its destructor is not called before the writer modules have
/// been de-allocated.
std::unique_ptr<FileWriterTask> WriterTask{nullptr};
std::unique_ptr<WriterModule::mdat::mdat_Writer> MdatWriter{nullptr};
std::vector<std::unique_ptr<Stream::Topic>> Streamers;
Metrics::Registrar StreamMetricRegistrar;
Stream::MessageWriter WriterThread;
Expand Down
1 change: 1 addition & 0 deletions src/WriterModule/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ add_subdirectory(ep00)
add_subdirectory(f144)
add_subdirectory(al00)
add_subdirectory(ep01)
add_subdirectory(mdat)
9 changes: 9 additions & 0 deletions src/WriterModule/mdat/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
set(mdat_SRC
mdat_Writer.cpp
)

set(mdat_INC
mdat_Writer.h
)

create_writer_module(mdat)
94 changes: 94 additions & 0 deletions src/WriterModule/mdat/mdat_Writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// SPDX-License-Identifier: BSD-2-Clause
//
// This code has been produced by the European Spallation Source
// and its partner institutes under the BSD 2 Clause License.
//
// See LICENSE.md at the top level for license information.
//
// Screaming Udder! https://esss.se

#include "mdat_Writer.h"
#include "FileWriterTask.h"
#include "HDFOperations.h"
#include "ModuleHDFInfo.h"
#include "MultiVector.h"
#include "TimeUtility.h"
#include "json.h"

namespace WriterModule::mdat {
void mdat_Writer::defineMetadata(std::vector<ModuleHDFInfo> const &Modules) {
Writables = extractDetails(Modules);
}

void mdat_Writer::setStartTime(time_point startTime) {
setWritableValueIfDefined(StartTime, startTime);
}

void mdat_Writer::setStopTime(time_point stopTime) {
setWritableValueIfDefined(EndTime, stopTime);
}

void mdat_Writer::setWritableValueIfDefined(std::string const &Name,
time_point const &Time) {
if (auto Result = Writables.find(Name); Result != Writables.end()) {
Result->second.Value = toUTCDateTime(Time);
}
}

void mdat_Writer::writeMetadata(FileWriter::FileWriterTask const *Task) const {
for (auto const &[Name, Value] : Writables) {
if (std::find(AllowedNames.cbegin(), AllowedNames.cend(), Name) ==
AllowedNames.end()) {
continue;
}
if (Value.isWritable()) {
writeStringValue(Task, Name, Value.Path, Value.Value);
}
}
}

void mdat_Writer::writeStringValue(FileWriter::FileWriterTask const *Task,
std::string const &Name,
std::string const &Path,
std::string const &Value) {
try {
auto StringVec = MultiVector<std::string>{{1}};
StringVec.at({0}) = Value;
auto Group = hdf5::node::get_group(Task->hdfGroup(), Path);
HDFOperations::writeStringDataset(Group, Name, StringVec);
} catch (std::exception &Error) {
LOG_ERROR("Failed to write mdat string value: {}", Error.what());
}
}

std::unordered_map<std::string, mdat_Writer::Writable>
mdat_Writer::extractDetails(std::vector<ModuleHDFInfo> const &Modules) const {
std::unordered_map<std::string, Writable> Details;

for (auto const &Module : Modules) {
if (Module.WriterModule != "mdat") {
continue;
}

auto const name = extractName(Module.ConfigStream);
if (name && std::find(AllowedNames.begin(), AllowedNames.end(), name) !=
AllowedNames.end()) {
Writable NewWritable;
NewWritable.Path = Module.HDFParentName;
Details[name.value()] = NewWritable;
}
}
return Details;
}

std::optional<std::string>
mdat_Writer::extractName(std::string const &configJson) {
nlohmann::json json = nlohmann::json::parse(configJson);
for (auto it = json.begin(); it != json.end(); ++it) {
if (it.key() == "name" && !it.value().empty()) {
return it.value();
}
}
return {};
}
} // namespace WriterModule::mdat
Loading

0 comments on commit b216d06

Please sign in to comment.