From 0a3dea7b670ce2a207112a247b1fdf7306ff33e2 Mon Sep 17 00:00:00 2001 From: Steven Shofner Date: Mon, 6 Apr 2020 15:19:32 -0700 Subject: [PATCH 1/2] Adds the StreamingBinWriter class that allows bin data to be streamed to disk instead of stored in memory. To facilitate this, the Writer class was updated as follows: 1. The output filename and extension are now member variables initialized with the an object of the class. 2. Creation of the directory was moved to an externally visible method to allow a user of objects of the Writer class to create the directory explicitly (StreamingBinWriter needs a valid path to write to on initialization). --- mffpy/bin_writer.py | 19 ++++++++++-- mffpy/tests/test_devices.py | 2 +- mffpy/tests/test_writer.py | 58 ++++++++++++++++++++++++++++++++++++- mffpy/writer.py | 24 +++++++++------ 4 files changed, 89 insertions(+), 14 deletions(-) diff --git a/mffpy/bin_writer.py b/mffpy/bin_writer.py index 36fc206..fd51567 100644 --- a/mffpy/bin_writer.py +++ b/mffpy/bin_writer.py @@ -13,8 +13,9 @@ ANY KIND, either express or implied. """ from os import SEEK_SET -from io import BytesIO +from io import BytesIO, FileIO from typing import List, Union +from os.path import join import numpy as np @@ -25,8 +26,7 @@ compute_header_byte_size ) - -class BinWriter: +class BinWriter(object): default_filename = 'signal1.bin' default_info_filename = 'info1.xml' @@ -138,3 +138,16 @@ def write(self, filename: str, *args, **kwargs): num_written = fo.write(byts) assert num_written == len(byts), f""" Wrote {num_written} bytes (expected {len(byts)})""" + +class StreamingBinWriter(BinWriter): + + """ + Subclass of BinWriter to support streaming bin file to disk. + """ + + def __init__(self, sampling_rate: int, mffdir: str, data_type: str = 'EEG'): + super().__init__(sampling_rate, data_type) + self.stream = FileIO(join(mffdir, self.default_filename), mode='w') + + def write(self, filename: str, *args, **kwargs): + self.stream.close() diff --git a/mffpy/tests/test_devices.py b/mffpy/tests/test_devices.py index d779c56..e5db0e8 100644 --- a/mffpy/tests/test_devices.py +++ b/mffpy/tests/test_devices.py @@ -45,6 +45,6 @@ def test_devices(device): for i, (_, props) in enumerate(coords.sensors.items()) ], dtype=np.float) device = basename(splitext(device)[0]) if exists(device) else device - expected = np.load(join(resources_dir, 'testing', device+'.npy')) + expected = np.load(join(resources_dir, 'testing', device+'.npy'), allow_pickle=True) assert locs.shape == expected.shape assert locs == pytest.approx(expected) diff --git a/mffpy/tests/test_writer.py b/mffpy/tests/test_writer.py index 971b18b..53fbace 100644 --- a/mffpy/tests/test_writer.py +++ b/mffpy/tests/test_writer.py @@ -15,13 +15,14 @@ from datetime import datetime from os import makedirs, rmdir, remove from os.path import join +from shutil import rmtree import pytest import json import numpy as np from ..writer import Writer -from ..bin_writer import BinWriter +from ..bin_writer import BinWriter, StreamingBinWriter from ..reader import Reader from ..xml_files import XML @@ -110,3 +111,58 @@ def test_writer_exports_JSON(): remove(filename) except BaseException: raise AssertionError(f"""Clean-up failed of '{filename}'.""") + +##START## + +def test_streaming_writer_receives_bad_init_data(): + """Test bin writer fails when initialized with non-int sampling rate""" + dirname = 'testdir.mff' + makedirs(dirname) + StreamingBinWriter(100, mffdir=dirname) + with pytest.raises(AssertionError): + StreamingBinWriter(100.0, mffdir=dirname) + rmtree(dirname) + +def test_streaming_writer_writes(): + dirname = 'testdir2.mff' + # create some data and add it to a binary writer + device = 'HydroCel GSN 256 1.0' + num_samples = 10 + num_channels = 256 + sampling_rate = 128 + # create an mffpy.Writer and add a file info, and the binary file + W = Writer(dirname) + W.create_directory() + b = StreamingBinWriter(sampling_rate=sampling_rate, data_type='EEG', mffdir=dirname) + data = np.random.randn(num_channels, num_samples).astype(np.float32) + b.add_block(data) + startdatetime = datetime.strptime( + '1984-02-18T14:00:10.000000+0100', XML._time_format) + W.addxml('fileInfo', recordTime=startdatetime) + W.add_coordinates_and_sensor_layout(device) + W.addbin(b) + W.write() + # read it again; compare the result + R = Reader(dirname) + assert R.startdatetime == startdatetime + # Read binary data and compare + read_data = R.get_physical_samples_from_epoch(R.epochs[0]) + assert 'EEG' in read_data + read_data, t0 = read_data['EEG'] + assert t0 == 0.0 + assert read_data == pytest.approx(data) + layout = R.directory.filepointer('sensorLayout') + layout = XML.from_file(layout) + assert layout.name == device + # cleanup + try: + remove(join(dirname, 'info.xml')) + remove(join(dirname, 'info1.xml')) + remove(join(dirname, 'epochs.xml')) + remove(join(dirname, 'signal1.bin')) + remove(join(dirname, 'coordinates.xml')) + remove(join(dirname, 'sensorLayout.xml')) + rmdir(dirname) + except BaseException: + raise AssertionError(f""" + Clean-up failed of '{dirname}'. Were additional files written?""") diff --git a/mffpy/writer.py b/mffpy/writer.py index d5ee08a..2fe0ed8 100644 --- a/mffpy/writer.py +++ b/mffpy/writer.py @@ -21,11 +21,11 @@ from .dict2xml import dict2xml from .xml_files import XML -from .bin_writer import BinWriter +from .bin_writer import BinWriter, StreamingBinWriter from .devices import coordinates_and_sensor_layout import json -__all__ = ['Writer', 'BinWriter'] +__all__ = ['Writer', 'BinWriter', 'StreamingBinWriter'] class Writer: @@ -34,25 +34,31 @@ def __init__(self, filename: str): self.filename = filename self.files: Dict[str, Any] = {} self._bin_file_added = False + self.mffdir, self.ext = splitext(self.filename) + self.mffdir += '.mff' + self.file_created = False + + def create_directory(self): + if not self.file_created: + makedirs(self.mffdir, exist_ok=False) + self.file_created = True def write(self): """write contents to .mff/.mfz file""" - # create .mff directory - mffdir, ext = splitext(self.filename) - mffdir += '.mff' - makedirs(mffdir, exist_ok=False) + + self.create_directory() # write .xml/.bin files. For .xml files we need to set the default # namespace to avoid `ns0:` being prepended to each tag. for filename, (content, typ) in self.files.items(): if '.xml' == splitext(filename)[1]: ET.register_namespace('', typ._xmlns[1:-1]) - content.write(join(mffdir, filename), encoding='UTF-8', + content.write(join(self.mffdir, filename), encoding='UTF-8', xml_declaration=True, method='xml') # convert from .mff to .mfz - if ext == '.mfz': - check_output(['mff2mfz.py', mffdir]) + if self.ext == '.mfz': + check_output(['mff2mfz.py', self.mffdir]) def export_to_json(self, data): """export data to .json file""" From 7848a9bc08db0a2785208fc2877bdc282fbea401 Mon Sep 17 00:00:00 2001 From: Steven Shofner Date: Wed, 8 Apr 2020 07:35:23 -0700 Subject: [PATCH 2/2] Updates code to address failure when running with mypy. Updates code/comments to address issues raised by code review. --- mffpy/bin_writer.py | 22 ++++++++++++++++++++-- mffpy/header_block.py | 5 +++-- mffpy/tests/test_writer.py | 36 ++++++++++++++---------------------- mffpy/writer.py | 1 + 4 files changed, 38 insertions(+), 26 deletions(-) diff --git a/mffpy/bin_writer.py b/mffpy/bin_writer.py index fd51567..3ffb5fb 100644 --- a/mffpy/bin_writer.py +++ b/mffpy/bin_writer.py @@ -14,7 +14,7 @@ """ from os import SEEK_SET from io import BytesIO, FileIO -from typing import List, Union +from typing import List, Union, IO from os.path import join import numpy as np @@ -44,7 +44,7 @@ def __init__(self, sampling_rate: int, data_type: str = 'EEG'): self.data_type = data_type self.sampling_rate = sampling_rate self.header: Union[HeaderBlock, None] = None - self.stream = BytesIO() + self.stream: Union[IO[bytes], FileIO] = BytesIO() self.epochs: List[Epoch] = [] @property @@ -134,6 +134,7 @@ def write(self, filename: str, *args, **kwargs): # *args, **kwargs are ignored self.stream.seek(0, SEEK_SET) byts = self.stream.read() + assert isinstance(byts, bytes) with open(filename, 'wb') as fo: num_written = fo.write(byts) assert num_written == len(byts), f""" @@ -146,8 +147,25 @@ class StreamingBinWriter(BinWriter): """ def __init__(self, sampling_rate: int, mffdir: str, data_type: str = 'EEG'): + """ + + **Parameters** + + * **`sampling_rate`**: sampling rate of all channels. Sampling rate + has to fit in a 3-byte integer. See docs in `mffpy.header_block`. + + * **`data_type`**: name of the type of signal. + + * **`mffdir`**: directory of the mff recording to stream data to. + + Note: Because we are streaming the recording to disk, the folder into which it + is to be saved must have been created prior to the initialization of this class. + """ + super().__init__(sampling_rate, data_type) self.stream = FileIO(join(mffdir, self.default_filename), mode='w') def write(self, filename: str, *args, **kwargs): + # Because the recording has been streamed to a file, all that is required + # here is closing the stream self.stream.close() diff --git a/mffpy/header_block.py b/mffpy/header_block.py index bf59f67..a56273e 100644 --- a/mffpy/header_block.py +++ b/mffpy/header_block.py @@ -42,8 +42,9 @@ import struct from os import SEEK_CUR -from typing import IO +from typing import IO, Union from collections import namedtuple +from io import FileIO import numpy as np @@ -141,7 +142,7 @@ def skip(n: int): ) -def write_header_block(fp: IO[bytes], hdr: HeaderBlock): +def write_header_block(fp: Union[IO[bytes], FileIO], hdr: HeaderBlock): """write HeaderBlock `hdr` to file pointer `fp`""" fp.write(struct.pack('4i', 1, hdr.header_size, hdr.block_size, hdr.num_channels)) diff --git a/mffpy/tests/test_writer.py b/mffpy/tests/test_writer.py index 53fbace..10a4a91 100644 --- a/mffpy/tests/test_writer.py +++ b/mffpy/tests/test_writer.py @@ -112,8 +112,6 @@ def test_writer_exports_JSON(): except BaseException: raise AssertionError(f"""Clean-up failed of '{filename}'.""") -##START## - def test_streaming_writer_receives_bad_init_data(): """Test bin writer fails when initialized with non-int sampling rate""" dirname = 'testdir.mff' @@ -124,45 +122,39 @@ def test_streaming_writer_receives_bad_init_data(): rmtree(dirname) def test_streaming_writer_writes(): - dirname = 'testdir2.mff' + dirname = 'testdir3.mff' # create some data and add it to a binary writer device = 'HydroCel GSN 256 1.0' num_samples = 10 num_channels = 256 sampling_rate = 128 # create an mffpy.Writer and add a file info, and the binary file - W = Writer(dirname) - W.create_directory() - b = StreamingBinWriter(sampling_rate=sampling_rate, data_type='EEG', mffdir=dirname) + writer = Writer(dirname) + writer.create_directory() + bin_writer = StreamingBinWriter(sampling_rate=sampling_rate, data_type='EEG', mffdir=dirname) data = np.random.randn(num_channels, num_samples).astype(np.float32) - b.add_block(data) + bin_writer.add_block(data) startdatetime = datetime.strptime( '1984-02-18T14:00:10.000000+0100', XML._time_format) - W.addxml('fileInfo', recordTime=startdatetime) - W.add_coordinates_and_sensor_layout(device) - W.addbin(b) - W.write() + writer.addxml('fileInfo', recordTime=startdatetime) + writer.add_coordinates_and_sensor_layout(device) + writer.addbin(bin_writer) + writer.write() # read it again; compare the result - R = Reader(dirname) - assert R.startdatetime == startdatetime + reader = Reader(dirname) + assert reader.startdatetime == startdatetime # Read binary data and compare - read_data = R.get_physical_samples_from_epoch(R.epochs[0]) + read_data = reader.get_physical_samples_from_epoch(reader.epochs[0]) assert 'EEG' in read_data read_data, t0 = read_data['EEG'] assert t0 == 0.0 assert read_data == pytest.approx(data) - layout = R.directory.filepointer('sensorLayout') + layout = reader.directory.filepointer('sensorLayout') layout = XML.from_file(layout) assert layout.name == device # cleanup try: - remove(join(dirname, 'info.xml')) - remove(join(dirname, 'info1.xml')) - remove(join(dirname, 'epochs.xml')) - remove(join(dirname, 'signal1.bin')) - remove(join(dirname, 'coordinates.xml')) - remove(join(dirname, 'sensorLayout.xml')) - rmdir(dirname) + rmtree(dirname) except BaseException: raise AssertionError(f""" Clean-up failed of '{dirname}'. Were additional files written?""") diff --git a/mffpy/writer.py b/mffpy/writer.py index 2fe0ed8..e2a5d0b 100644 --- a/mffpy/writer.py +++ b/mffpy/writer.py @@ -39,6 +39,7 @@ def __init__(self, filename: str): self.file_created = False def create_directory(self): + """Creates the directory for the recording.""" if not self.file_created: makedirs(self.mffdir, exist_ok=False) self.file_created = True