From cf02b26acc5606d1713bf86bc9de6e55f24ce000 Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Mon, 23 Oct 2023 18:08:07 -0400 Subject: [PATCH] blackrockrawio - quote consistency; add a few comments; fix a potential incorrect variable access. --- neo/rawio/blackrockrawio.py | 48 +++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/neo/rawio/blackrockrawio.py b/neo/rawio/blackrockrawio.py index 2a2d5257d..11b9107b9 100644 --- a/neo/rawio/blackrockrawio.py +++ b/neo/rawio/blackrockrawio.py @@ -7,6 +7,7 @@ * Samuel Garcia - third version * Lyuba Zehl, Michael Denker - fourth version * Samuel Garcia, Julia Srenger - fifth version + * Chadwick Boulay - FileSpec 3.0 and 3.0-PTP This IO supports reading only. This IO is able to read: @@ -17,6 +18,8 @@ * 2.1 * 2.2 * 2.3 + * 3.0 + * 3.0 with PTP timestamps (Gemini systems) The neural data channels are 1 - 128. The analog inputs are 129 - 144. (129 - 137 AC coupled, 138 - 144 DC coupled) @@ -322,6 +325,8 @@ def _parse_header(self): self.__nsx_basic_header[nsx_nb], self.__nsx_ext_header[nsx_nb] = \ self.__nsx_header_reader[spec](nsx_nb) + # The only way to know if it is the PTP-variant of file spec 3.0 + # is to check for nanosecond timestamp resolution. if 'timestamp_resolution' in self.__nsx_basic_header[nsx_nb].dtype.names and \ self.__nsx_basic_header[nsx_nb]['timestamp_resolution'] == 1_000_000_000: nsx_dataheader_reader = self.__nsx_dataheader_reader["3.0-ptp"] @@ -332,7 +337,7 @@ def _parse_header(self): # for nsxdef get_analogsignal_shape(self, block_index, seg_index): self.__nsx_data_header[nsx_nb] = nsx_dataheader_reader(nsx_nb) - # nsx_to_load can be either int, list, 'max', all' (aka None) + # nsx_to_load can be either int, list, 'max', 'all' (aka None) # here make a list only if self.nsx_to_load is None or self.nsx_to_load == 'all': self.nsx_to_load = list(self._avail_nsx) @@ -377,6 +382,8 @@ def _parse_header(self): if len(self.nsx_to_load) > 0: for nsx_nb in self.nsx_to_load: spec = self.__nsx_spec[nsx_nb] + # The only way to know if it is the PTP-variant of file spec 3.0 + # is to check for nanosecond timestamp resolution. if 'timestamp_resolution' in self.__nsx_basic_header[nsx_nb].dtype.names and \ self.__nsx_basic_header[nsx_nb]['timestamp_resolution'] == 1_000_000_000: _data_reader_fun = self.__nsx_data_reader['3.0-ptp'] @@ -440,6 +447,7 @@ def _parse_header(self): for data_bl in range(self._nb_segment): t_stop = 0. for nsx_nb in self.nsx_to_load: + spec = self.__nsx_spec[nsx_nb] if 'timestamp_resolution' in self.__nsx_basic_header[nsx_nb].dtype.names: ts_res = self.__nsx_basic_header[nsx_nb]['timestamp_resolution'] elif spec == '2.1': @@ -447,7 +455,7 @@ def _parse_header(self): else: ts_res = 30_000 period = self.__nsx_basic_header[nsx_nb]['period'] - sec_per_samp = period / 30_000 + sec_per_samp = period / 30_000 # Maybe 30_000 should be ['sample_resolution'] n_samples = self.nsx_datas[nsx_nb][data_bl].shape[0] if self.__nsx_data_header[nsx_nb] is None: t_start = 0. @@ -988,27 +996,27 @@ def __read_nsx_dataheader_variant_c( offset = self.__nsx_basic_header[nsx_nb]['bytes_in_headers'] ptp_dt = [ - ("reserved", "uint8"), - ("timestamps", "uint64"), - ("num_data_points", "uint32"), - ("samples", "int16", self.__nsx_basic_header[nsx_nb]['channel_count']) + ('reserved', 'uint8'), + ('timestamps', 'uint64'), + ('num_data_points', 'uint32'), + ('samples', 'int16', self.__nsx_basic_header[nsx_nb]['channel_count']) ] npackets = int((filesize - offset) / np.dtype(ptp_dt).itemsize) - struct_arr = np.memmap(filename, dtype=ptp_dt, shape=npackets, offset=offset, mode="r") + struct_arr = np.memmap(filename, dtype=ptp_dt, shape=npackets, offset=offset, mode='r') - if not np.all(struct_arr["num_data_points"] == 1): + if not np.all(struct_arr['num_data_points'] == 1): # some packets have more than 1 sample. Not actually ptp. Revert to non-ptp variant. return self.__read_nsx_dataheader_variant_b(nsx_nb, filesize=filesize, offset=offset) # It is still possible there was a data break and the file has multiple segments. # We can no longer rely on the presence of a header indicating a new segment, # so we look for timestamp differences greater than double the expected interval. - _period = self.__nsx_basic_header[nsx_nb]["period"] - _clock_rate = self.__nsx_basic_header[nsx_nb]["timestamp_resolution"] - clk_per_samp = _period * _clock_rate / 30_000 + _period = self.__nsx_basic_header[nsx_nb]['period'] + _clock_rate = self.__nsx_basic_header[nsx_nb]['timestamp_resolution'] + clk_per_samp = _period * _clock_rate / 30_000 # maybe 30_000 should be ['sample_resolution'] seg_thresh_clk = 2 * clk_per_samp seg_starts = np.hstack( - (0, 1 + np.argwhere(np.diff(struct_arr["timestamps"]) > seg_thresh_clk).flatten())) + (0, 1 + np.argwhere(np.diff(struct_arr['timestamps']) > seg_thresh_clk).flatten())) for seg_ix, seg_start_idx in enumerate(seg_starts): if seg_ix < (len(seg_starts) - 1): seg_stop_idx = seg_starts[seg_ix + 1] @@ -1020,7 +1028,7 @@ def __read_nsx_dataheader_variant_c( shape=num_data_pts, offset=seg_offset, mode="r") data_header[seg_ix] = { 'header': None, - 'timestamp': seg_struct_arr["timestamps"], # Note, this is an array, not a scalar + 'timestamp': seg_struct_arr['timestamps'], # Note, this is an array, not a scalar 'nb_data_points': num_data_pts, 'offset_to_data_block': seg_offset } @@ -1076,10 +1084,10 @@ def __read_nsx_data_variant_c(self, nsx_nb): filename = '.'.join([self._filenames['nsx'], 'ns%i' % nsx_nb]) ptp_dt = [ - ("reserved", "uint8"), - ("timestamps", "uint64"), - ("num_data_points", "uint32"), - ("samples", "int16", self.__nsx_basic_header[nsx_nb]['channel_count']) + ('reserved', 'uint8'), + ('timestamps', 'uint64'), + ('num_data_points', 'uint32'), + ('samples', 'int16', self.__nsx_basic_header[nsx_nb]['channel_count']) ] data = {} @@ -1087,13 +1095,13 @@ def __read_nsx_data_variant_c(self, nsx_nb): struct_arr = np.memmap( filename, dtype=ptp_dt, - shape=bl_header["nb_data_points"], - offset=bl_header["offset_to_data_block"], mode="r" + shape=bl_header['nb_data_points'], + offset=bl_header['offset_to_data_block'], mode='r' ) # Does this concretize the data? # If yes then investigate np.ndarray with buffer=file, # offset=offset+13, and strides that skips 13-bytes per row. - data[bl_id] = struct_arr["samples"] + data[bl_id] = struct_arr['samples'] return data