-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
end of day commit. tidy up some logic, get rid of things we dont care…
… about in output file, make precision logic a bit clearer. Also include callback in demo plan, make fixture for tests (which are broken atm)
- Loading branch information
Showing
5 changed files
with
144 additions
and
148 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
"""Creates a readable .txt file of Bluesky runengine dataset""" | ||
|
||
import csv | ||
from datetime import datetime | ||
from collections import OrderedDict | ||
from typing import Optional | ||
from pathlib import Path | ||
from bluesky.callbacks import CallbackBase | ||
from event_model.documents.event import Event | ||
from event_model.documents.event_descriptor import EventDescriptor | ||
from event_model.documents.run_start import RunStart | ||
from event_model.documents.run_stop import RunStop | ||
|
||
|
||
class HumanReadableOutputFileLoggingCallback(CallbackBase): | ||
"""Outputs bluesky runs to human-readable output files in the specified directory path.""" | ||
|
||
def __init__(self, fields: list[str], output_dir: Path) -> None: | ||
"""Initialise current_start_document and filename""" | ||
super().__init__() | ||
self.fields: list[str] = fields | ||
self.output_dir: Path = output_dir | ||
self.current_start_document: Optional[str] = None | ||
self.descriptors: dict[str, EventDescriptor] = {} | ||
self.filename: Optional[str] = None | ||
|
||
def start(self, doc: RunStart) -> None: | ||
"""Start writing an output file. This involves creating the file if it doesn't already exist | ||
then putting the metadata ie. start time, uid in the header.""" | ||
self.output_dir.mkdir(parents=True, exist_ok=True) | ||
self.current_start_document = doc["uid"] | ||
self.filename = self.output_dir / f"{self.current_start_document}.txt" | ||
|
||
assert self.filename is not None, "Could not create filename." | ||
assert self.current_start_document is not None, "Saw a non-start document before a start." | ||
|
||
exclude_list = ["time", "plan_name", "plan_type", "scan_id", "versions"] | ||
header_data = {k: v for k, v in doc.items() if k not in exclude_list} | ||
|
||
datetime_obj = datetime.fromtimestamp(doc["time"]) | ||
formatted_time = datetime_obj.strftime("%Y-%m-%d %H:%M:%S") | ||
header_data["start_time"] = formatted_time | ||
|
||
with open(self.filename, "a") as outfile: | ||
for key, value in header_data.items(): | ||
outfile.write(f"{key}: {value}\n") | ||
|
||
def descriptor(self, doc: EventDescriptor) -> None: | ||
if doc["name"] != "primary": | ||
return | ||
|
||
descriptor_id = doc["uid"] | ||
self.descriptors[descriptor_id] = doc | ||
|
||
def event(self, doc: Event) -> None: | ||
"""Append an event's output to the file""" | ||
formatted_event_data = {} | ||
descriptor_id = doc["descriptor"] | ||
event_data = doc["data"] | ||
descriptor_data = self.descriptors[descriptor_id]["data_keys"] | ||
|
||
for data_key, data_value in event_data.items(): | ||
if data_key in self.fields: | ||
formatted_event_data[data_key] = ( | ||
f"{data_value:.{descriptor_data[data_key]['precision']}f}" | ||
if "precision" in descriptor_data[data_key] and isinstance(data_value, float) | ||
else data_value | ||
) | ||
|
||
with open(self.filename, "a", newline="") as outfile: | ||
if doc["seq_num"] == 1: | ||
# If this is the first event, write out the units before writing event data | ||
units_dict = OrderedDict( | ||
{ | ||
key: value.get("units", "n/a") | ||
for key, value in descriptor_data.items() | ||
if key in self.fields | ||
} | ||
) | ||
units_line = " ".join(f"{key} ({value})" for key, value in (units_dict.items())) | ||
outfile.write(f"\n{units_line}\n") | ||
writer = csv.DictWriter(outfile, fieldnames=formatted_event_data, delimiter="\t") | ||
writer.writerows([formatted_event_data]) | ||
|
||
def stop(self, doc: RunStop) -> None: | ||
self.descriptors.clear() | ||
return super().stop(doc) |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,55 +1,57 @@ | ||
# pyright: reportMissingParameterType=false | ||
|
||
import pytest | ||
import json | ||
from unittest.mock import mock_open, MagicMock, patch | ||
from pathlib import Path | ||
from typing import Generator | ||
|
||
import bluesky.plan_stubs as bps | ||
from bluesky.run_engine import RunEngineResult | ||
from bluesky.utils import Msg | ||
|
||
from ibex_bluesky_core.callbacks.write_log import OutputLoggingCallback | ||
|
||
from ibex_bluesky_core.callbacks.file_logger import HumanReadableOutputFileLoggingCallback | ||
|
||
|
||
m = mock_open() | ||
save_path = Path("C:\\") / "instrument" / "var" / "logs" / "bluesky" / "output_files" | ||
doc = {"uid": "23097941-783c-4353-8809-d8038a52517f", | ||
"time": 1726242068.520683, | ||
"versions": {"bluesky": "1.13.0a4"}, | ||
"scan_id": 1, "plan_type": "generator", | ||
"plan_name": "example", | ||
"name": "primary", | ||
"test": "test", | ||
"descriptor": "23097941-783c-4353-8809-d8038a52517f", | ||
"data" :{ | ||
'mot-setpoint_readback': 1.0, | ||
'mot': 1.0, | ||
'DAE-good_uah': 0.8474488258361816, | ||
'DAE': 0.8474488258361816 | ||
} } | ||
|
||
testing_function = OutputLoggingCallback(['block', 'dae'], save_path) | ||
|
||
def test_start_data(RE): | ||
with patch("ibex_bluesky_core.callbacks.write_log.open", m) as mock_file: | ||
testing_function.start(doc) | ||
doc = { | ||
"uid": "23097941-783c-4353-8809-d8038a52517f", | ||
"time": 1726242068.520683, | ||
"versions": {"bluesky": "1.13.0a4"}, | ||
"scan_id": 1, | ||
"plan_type": "generator", | ||
"plan_name": "example", | ||
"name": "primary", | ||
"test": "test", | ||
"descriptor": "23097941-783c-4353-8809-d8038a52517f", | ||
"data": { | ||
"mot-setpoint_readback": 1.0, | ||
"mot": 1.0, | ||
"DAE-good_uah": 0.8474488258361816, | ||
"DAE": 0.8474488258361816, | ||
}, | ||
} | ||
|
||
|
||
@pytest.fixture | ||
def cb() -> HumanReadableOutputFileLoggingCallback: | ||
return HumanReadableOutputFileLoggingCallback(["block", "dae"], save_path) | ||
|
||
|
||
def test_start_data(cb): | ||
with patch("ibex_bluesky_core.callbacks.file_logger.open", m) as mock_file: | ||
cb.start(doc) | ||
result = save_path / f"{doc['uid']}.txt" | ||
|
||
mock_file.assert_called_with(result, 'a') | ||
mock_file.assert_called_with(result, "a") | ||
|
||
expected_content = f"{list(doc.keys())[-1]}: {list(doc.values())[-1]}\n" | ||
mock_file().write.assert_called_with(expected_content) | ||
mock_file.write.assert_called_with(expected_content) | ||
|
||
|
||
def test_descriptor_data(cb): | ||
cb.descriptor(doc) | ||
|
||
def test_descriptor_data(RE): | ||
testing_function.descriptor(doc) | ||
assert doc["uid"] in cb.descriptors | ||
assert cb.descriptors[doc["uid"]] == doc | ||
|
||
assert doc['uid'] in testing_function.descriptors | ||
assert testing_function.descriptors[doc['uid']] == doc | ||
|
||
def test_event_data(RE): | ||
testing_function.event(doc) | ||
def test_event_data(cb): | ||
cb.event(doc) | ||
|
||
assert doc['DAE'] in testing_function.event | ||
assert doc["DAE"] in cb.event |