diff --git a/neo/rawio/openephysrawio.py b/neo/rawio/openephysrawio.py index f2b370ff9..ae6cfcb35 100644 --- a/neo/rawio/openephysrawio.py +++ b/neo/rawio/openephysrawio.py @@ -53,25 +53,31 @@ class OpenEphysRawIO(BaseRawIO): Limitation : * Works only if all continuous channels have the same sampling rate, which is a reasonable hypothesis. - * When the recording is stopped and restarted all continuous files will contain gaps. - Ideally this would lead to a new Segment but this use case is not implemented due to its - complexity. - Instead it will raise an error. - - Special cases: - * Normally all continuous files have the same first timestamp and length. In situations - where it is not the case all files are clipped to the smallest one so that they are all - aligned, - and a warning is emitted. + * A recording can contain gaps due to USB stream loss when high CPU load when recording. + Theses gaps are checked channel per channel which make the parse_header() slow. + If gaps are detected then they are filled with zeros but the the reading will be much slower for getting signals. + + Parameters + ---------- + dirname: str + The directory where the files are stored. + ignore_timestamps_errors: bool + (deprecated) This parameter is not used anymore. + fill_gap_value: int + When gaps are detected in continuous files, the gap is filled with this value. + Default is 0. + """ # file formats used by openephys extensions = ['continuous', 'openephys', 'spikes', 'events', 'xml'] rawmode = 'one-dir' - def __init__(self, dirname='', ignore_timestamps_errors=False): + def __init__(self, dirname='', ignore_timestamps_errors=None, fill_gap_value=0): BaseRawIO.__init__(self) self.dirname = dirname - self._ignore_timestamps_errors = ignore_timestamps_errors + self.fill_gap_value = int(fill_gap_value) + if ignore_timestamps_errors is not None: + self.logger.warning("OpenEphysRawIO ignore_timestamps_errors=True/False is not used anymore") def _source_name(self): return self.dirname @@ -84,12 +90,14 @@ def _parse_header(self): self._sigs_memmap = {} self._sig_length = {} self._sig_timestamp0 = {} + self._sig_has_gap = {} + self._gap_mode = False signal_channels = [] oe_indices = sorted(list(info['continuous'].keys())) for seg_index, oe_index in enumerate(oe_indices): self._sigs_memmap[seg_index] = {} + self._sig_has_gap[seg_index] = {} - all_sigs_length = [] all_first_timestamps = [] all_last_timestamps = [] all_samplerate = [] @@ -109,18 +117,18 @@ def _parse_header(self): dtype=continuous_dtype, shape=(size, )) self._sigs_memmap[seg_index][chan_index] = data_chan - all_sigs_length.append(data_chan.size * RECORD_SIZE) all_first_timestamps.append(data_chan[0]['timestamp']) - all_last_timestamps.append(data_chan[-1]['timestamp']) + all_last_timestamps.append(data_chan[-1]['timestamp'] + RECORD_SIZE) all_samplerate.append(chan_info['sampleRate']) # check for continuity (no gaps) diff = np.diff(data_chan['timestamp']) - if not np.all(diff == RECORD_SIZE) and not self._ignore_timestamps_errors: - raise ValueError( - 'Not continuous timestamps for {}. ' \ - 'Maybe because recording was paused/stopped.'.format(continuous_filename) - ) + channel_has_gaps = not np.all(diff == RECORD_SIZE) + self._sig_has_gap[seg_index][chan_index] = channel_has_gaps + + if channel_has_gaps: + # protect against strange timestamp block like in file 'OpenEphys_SampleData_3' CH32 + assert np.median(diff) == RECORD_SIZE, f"This file has a non valid data block size for channel {chan_id}, this case cannot be handled" if seg_index == 0: # add in channel list @@ -130,46 +138,39 @@ def _parse_header(self): units = 'V' signal_channels.append((ch_name, chan_id, chan_info['sampleRate'], 'int16', units, chan_info['bitVolts'], 0., processor_id)) - - # In some cases, continuous do not have the same length because - # one record block is missing when the "OE GUI is freezing" - # So we need to clip to the smallest files - if not all(all_sigs_length[0] == e for e in all_sigs_length) or\ - not all(all_first_timestamps[0] == e for e in all_first_timestamps): - + + if any(self._sig_has_gap[seg_index].values()): + channel_with_gaps = list(self._sig_has_gap[seg_index].keys()) + self.logger.warning(f"This OpenEphys dataset contains gaps for some channels {channel_with_gaps} in segment {seg_index} the read will be slow") + self._gap_mode = True + + + if not all(all_first_timestamps[0] == e for e in all_first_timestamps) or \ + not all(all_last_timestamps[0] == e for e in all_last_timestamps): + # In some cases, continuous do not have the same length because + # we need to clip self.logger.warning('Continuous files do not have aligned timestamps; ' 'clipping to make them aligned.') - first, last = -np.inf, np.inf + first = max(all_first_timestamps) + last = min(all_last_timestamps) for chan_index in self._sigs_memmap[seg_index]: data_chan = self._sigs_memmap[seg_index][chan_index] - if data_chan[0]['timestamp'] > first: - first = data_chan[0]['timestamp'] - if data_chan[-1]['timestamp'] < last: - last = data_chan[-1]['timestamp'] - - all_sigs_length = [] - all_first_timestamps = [] - all_last_timestamps = [] - for chan_index in self._sigs_memmap[seg_index]: - data_chan = self._sigs_memmap[seg_index][chan_index] - keep = (data_chan['timestamp'] >= first) & (data_chan['timestamp'] <= last) + keep = (data_chan['timestamp'] >= first) & (data_chan['timestamp'] < last) data_chan = data_chan[keep] self._sigs_memmap[seg_index][chan_index] = data_chan - all_sigs_length.append(data_chan.size * RECORD_SIZE) - all_first_timestamps.append(data_chan[0]['timestamp']) - all_last_timestamps.append(data_chan[-1]['timestamp']) - - # check that all signals have the same length and timestamp0 for this segment - assert all(all_sigs_length[0] == e for e in all_sigs_length),\ - 'Not all signals have the same length' - assert all(all_first_timestamps[0] == e for e in all_first_timestamps),\ - 'Not all signals have the same first timestamp' + else: + # no clip + first = all_first_timestamps[0] + last = all_last_timestamps[0] + + + # check unique sampling rate assert all(all_samplerate[0] == e for e in all_samplerate),\ 'Not all signals have the same sample rate' - self._sig_length[seg_index] = all_sigs_length[0] - self._sig_timestamp0[seg_index] = all_first_timestamps[0] + self._sig_length[seg_index] = last - first + self._sig_timestamp0[seg_index] = first if len(signal_channels) > 0: signal_channels = np.array(signal_channels, dtype=_signal_channel_dtype) @@ -316,11 +317,6 @@ def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, if i_stop is None: i_stop = self._sig_length[seg_index] - block_start = i_start // RECORD_SIZE - block_stop = i_stop // RECORD_SIZE + 1 - sl0 = i_start % RECORD_SIZE - sl1 = sl0 + (i_stop - i_start) - stream_id = self.header['signal_streams'][stream_index]['id'] mask = self.header['signal_channels']['stream_id'] global_channel_indexes, = np.nonzero(mask == stream_id) @@ -328,11 +324,66 @@ def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes = slice(None) global_channel_indexes = global_channel_indexes[channel_indexes] - sigs_chunk = np.zeros((i_stop - i_start, len(global_channel_indexes)), dtype='int16') - for i, global_chan_index in enumerate(global_channel_indexes): - data = self._sigs_memmap[seg_index][global_chan_index] - sub = data[block_start:block_stop] - sigs_chunk[:, i] = sub['samples'].flatten()[sl0:sl1] + if not self._gap_mode: + sigs_chunk = np.zeros((i_stop - i_start, len(global_channel_indexes)), dtype='int16') + # previous behavior block index are linear + block_start = i_start // RECORD_SIZE + block_stop = i_stop // RECORD_SIZE + 1 + sl0 = i_start % RECORD_SIZE + sl1 = sl0 + (i_stop - i_start) + + for i, global_chan_index in enumerate(global_channel_indexes): + data = self._sigs_memmap[seg_index][global_chan_index] + sub = data[block_start:block_stop] + sigs_chunk[:, i] = sub['samples'].flatten()[sl0:sl1] + else: + sigs_chunk = np.full(shape=(i_stop - i_start, len(global_channel_indexes)), + fill_value=self.fill_gap_value, + dtype='int16') + # slow mode + for i, global_chan_index in enumerate(global_channel_indexes): + data = self._sigs_memmap[seg_index][global_chan_index] + timestamp0 = data[0]['timestamp'] + + # find first block + block0 = np.searchsorted(data['timestamp'], timestamp0 + i_start, side='right') - 1 + block0_pos = data[block0]['timestamp'] - timestamp0 + + if i_start - block0_pos > RECORD_SIZE: + # the block has gap!! + pos = - ((i_start - block0_pos) % RECORD_SIZE) + block_index = block0 + 1 + else: + # the first block do not have gaps + shift0 = i_start - block0_pos + + if shift0 + (i_stop - i_start) < RECORD_SIZE: + # protect when only one small block + pos = (i_stop - i_start) + sigs_chunk[:, i][:pos] = data[block0]['samples'][shift0:shift0 + pos] + else: + + pos = RECORD_SIZE - shift0 + sigs_chunk[:, i][:pos] = data[block0]['samples'][shift0:] + block_index = block0 + 1 + + # full block + while block_index < data.size and data[block_index]['timestamp'] - timestamp0 < i_stop - RECORD_SIZE: + diff = data[block_index]['timestamp'] - data[block_index - 1]['timestamp'] + if diff > RECORD_SIZE: + # gap detected need jump + pos += diff - RECORD_SIZE + + sigs_chunk[:, i][pos:pos + RECORD_SIZE] = data[block_index]['samples'][:] + pos += RECORD_SIZE + block_index += 1 + + # last block + if pos < i_stop - i_start: + diff = data[block_index]['timestamp'] - data[block_index - 1]['timestamp'] + if diff == RECORD_SIZE: + # ensure no gaps for last block + sigs_chunk[:, i][pos:] = data[block_index]['samples'][:i_stop - i_start - pos] return sigs_chunk @@ -524,9 +575,12 @@ def explore_folder(dirname): chan_ids_by_type[chan_type] = [chan_id] filenames_by_type[chan_type] = [continuous_filename] chan_types = list(chan_ids_by_type.keys()) - if chan_types[0] == 'ADC': - # put ADC at last position - chan_types = chan_types[1:] + chan_types[0:1] + + if 'CH' in chan_types: + # force CH at beginning + chan_types.remove('CH') + chan_types = ['CH'] + chan_types + ordered_continuous_filenames = [] for chan_type in chan_types: local_order = np.argsort(chan_ids_by_type[chan_type]) diff --git a/neo/test/rawiotest/rawio_compliance.py b/neo/test/rawiotest/rawio_compliance.py index a40cb5d27..0a2e40d20 100644 --- a/neo/test/rawiotest/rawio_compliance.py +++ b/neo/test/rawiotest/rawio_compliance.py @@ -243,6 +243,7 @@ def read_analogsignals(reader): chunks.append(raw_chunk) i_start += chunksize chunk_raw_sigs = np.concatenate(chunks, axis=0) + np.testing.assert_array_equal(ref_raw_sigs, chunk_raw_sigs) diff --git a/neo/test/rawiotest/test_openephysrawio.py b/neo/test/rawiotest/test_openephysrawio.py index b53c419cf..80391bba4 100644 --- a/neo/test/rawiotest/test_openephysrawio.py +++ b/neo/test/rawiotest/test_openephysrawio.py @@ -11,23 +11,11 @@ class TestOpenEphysRawIO(BaseTestRawIO, unittest.TestCase, ): ] entities_to_test = [ 'openephys/OpenEphys_SampleData_1', - # 'OpenEphys_SampleData_2_(multiple_starts)', # This not implemented this raise error - # 'OpenEphys_SampleData_3', + # this file has gaps and this is now handle corretly + 'openephys/OpenEphys_SampleData_2_(multiple_starts)', + # 'openephys/OpenEphys_SampleData_3', ] - def test_raise_error_if_discontinuous_files(self): - # the case of discontinuous signals is NOT cover by the IO for the moment - # It must raise an error - reader = OpenEphysRawIO(dirname=self.get_local_path( - 'openephys/OpenEphys_SampleData_2_(multiple_starts)')) - with self.assertRaises(ValueError): - reader.parse_header() - # if ignore_timestamps_errors=True, no exception is raised - reader = OpenEphysRawIO(dirname=self.get_local_path( - 'openephys/OpenEphys_SampleData_2_(multiple_starts)'), - ignore_timestamps_errors=True) - reader.parse_header() - def test_raise_error_if_strange_timestamps(self): # In this dataset CH32 have strange timestamps