Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read after write dev #77

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,20 @@
# sessions_to_run = list(set(brain_wide_sessions) - set(already_written_processed_sessions))

nwbfile_path = base_path / "nwbfiles" / session / f"{session}.nwb"
nwbfile_path.parent.mkdir(exist_ok=True)
os.makedirs(nwbfile_path.parent, exist_ok=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
os.makedirs(nwbfile_path.parent, exist_ok=True)
nwbfile_path.parent.mkdir(exist_ok=True)

Why change this? Pathlib is generally more convenient

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can add parents=True if that is necessary

# nwbfile_path.parent.mkdir(exist_ok=True)

stub_test: bool = False
cleanup: bool = False

# assert len(os.environ.get("DANDI_API_KEY", "")) > 0, "Run `export DANDI_API_KEY=...`!"

nwbfile_path.parent.mkdir(exist_ok=True)

# Download behavior and spike sorted data for this session
session_path = base_path / "ibl_conversion" / session
cache_folder = base_path / "ibl_conversion" / session / "cache"
os.makedirs(session_path, exist_ok=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should exist at this point from the previous calls

os.makedirs(cache_folder, exist_ok=True)

session_one = ONE(
base_url="https://openalyx.internationalbrainlab.org",
password="international",
Expand Down
361 changes: 361 additions & 0 deletions ibl_to_nwb/brainwide_map/read_after_write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,361 @@
import os
import numpy as np
import pandas as pd
from pathlib import Path
import h5py
from pynwb import NWBHDF5IO, NWBFile
from one.api import ONE
import logging
from typing import Optional
from numpy import testing

def check_arrays(array_a: np.ndarray, array_b: np.ndarray, full_check: bool = False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This style of redirection is not recommended (a helper function just to decide between uses of numpy.testing)

TBH any data array in the processed files will be small enough such that a full (non-subset) usage of numpy.testing.assert_allclose should always be fast enough to just do that

The only question for submitting would be if you want to do something like this on the raw data, but that's probably important enough for it to be a separate check altogether

Also, the NWB write step should not affect floating point precision (and if it did we would actually want to be informed about that) so I recommend using numpy.testing.assert_array_equal instead. Or in general, recommend numpy.testing.assert_array_almost_equal since it should check other aspects of the arrays beyond the data values

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only question for submitting would be if you want to do something like this on the raw data, but that's probably important enough for it to be a separate check altogether

I had this in mind when I was writing the function, and that small speed improvements are probably going to result in large total time savings. That said, I haven't done any benchmarking yet (besides the np.searchsorted way to access the spike times).

Also, the NWB write step should not affect floating point precision (and if it did we would actually want to be informed about that) so I recommend using numpy.testing.assert_array_equal instead. Or in general, recommend numpy.testing.assert_array_almost_equal since it should check other aspects of the arrays beyond the data values

I wasn't entirely sure if this is true across all architectures or if we need to consider some other form of rounding error imprecision, so I thought using allclose would allow to adjust for that if necessary, and if true equality is desired rtol=0 should do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way to think of the NWB conversion process (for bulk data anyway) is that 'bytes in' are still just 'bytes out', so if you decoded the original SpikeGLX binaries into their raw bytes (things that look like \xff\x01r), those sequences should be exactly the same as when you read them from the data array after it's written to the NWB file

Floating point errors would mostly come up if you performed operations like scaling (such as by the conversion factor to get values into scientific units)

Or if you used lossless compression to save more space in the file; though that would often exceed standard 'floating point' error

"""checks if two arrays contain the same numerical values

Args:
array_a (np.ndarray): _description_
array_b (np.ndarray): _description_
full_check (bool, optional): If True, compares all values of the arrays. If False, checks a small subsample only. Defaults to False.
"""

# if full check, check all samples
if full_check:
testing.assert_allclose(array_a, array_b)

# check just a random subset of samples
else:
inds = np.random.randint(0, np.prod(array_a.shape), size=10)
testing.assert_allclose(np.ravel(array_a)[inds], np.ravel(array_b)[inds])



def check_series(series_a: pd.Series, series_b: pd.Series, full_check: bool = False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, not really recommended to define a one-line redirection function. Just making more work than necessary for ourselves here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This had somewhat historical reasons and I left it in as a stub. When I started coding, I wasn't even aware of the existence of numpy.testing so the initial flow of the data was in chained calls where I could go starting from a complex datatype like a pd.DataFrame consecutively to simple ones, where in the end an assert() call would take care of the comparison of the selected numbers.

Things have changed, but I left the general structure up for discussion in case it could still be useful for a case I haven't thought about.

"""checks if two pd.Series contain the same numerical values. Checks if NaN values are the same.

Args:
series_a (pd.Series): _description_
series_b (pd.Series): _description_
full_check (bool, optional): _description_. Defaults to False.
"""

# -> all of this functionality is now moved to check_arrays()
# this function as of now is obsolete but kept for potential future integration

check_arrays(series_a.values, series_b.values, full_check=full_check)


def check_tables(table_a: pd.DataFrame, table_b: pd.DataFrame, naming_map: dict = None, full_check: bool = False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is for checking that two pandas data frames are equivalent, why not usepandas.testing.assert_frame_equal? (and resolve any column name mapping by creating a copy of the NWB or ONE frame and renaming the columns on the copy passed to the assertion?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) thanks for pointing me towards pandas.testing

"""checks if two pd.DataFrames contain the same numerical values. Performs an "is in" comparison: checks if data of table_a is present in table_b.

Args:
table_a (pd.DataFrame): _description_
table_b (pd.DataFrame): _description_
naming_map (dict, optional): if naming map is given, it is used to map the names of columns of table_a to those of table_b. Defaults to None, checks if columns are identical.
full_check (bool, optional): _description_. Defaults to False.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An index subset approach should especially not be needed for tables since any data here is already loaded into RAM (making any non-subset assertion super fast)

"""
# convert column names if necessary
if naming_map is not None:
table_a_cols = table_a.columns
table_b_cols = [naming_map[col] for col in table_a.columns]
else:
# if no map is given, columns have to be the same (but not in the same order)
assert np.all([col in table_b.columns for col in table_a.columns])
table_a_cols = table_a.columns
table_b_cols = table_a.columns

for col_a, col_b in zip(table_a_cols, table_b_cols):
check_series(table_a[col_a], table_b[col_b], full_check=full_check)


def test_WheelInterface(
nwbfile: NWBFile, one: ONE, eid: str, full_check: bool = False, verbose: bool = False, revision: str = None
):
"""_summary_

Args:
nwbfile (NWBFile): nwbfile object
one (ONE): ONE object
eid (str): ONE experiment uuid
full_check (bool, optional): if True, verifies all values, if False, performs checks on a sample. Defaults to False.
verbose (bool, optional): _description_. Defaults to False.
revision (str, optional): _description_. Defaults to None.
"""

# wheel position
data_nwb = (
nwbfile.processing["behavior"].data_interfaces["CompassDirection"].spatial_series["WheelPositionSeries"].data[:]
)
data_one = one.load_dataset(eid, "_ibl_wheel.position", collection="alf")
check_arrays(data_nwb, data_one, full_check=full_check)

# wheel timestamps
data_nwb = (
nwbfile.processing["behavior"]
.data_interfaces["CompassDirection"]
.spatial_series["WheelPositionSeries"]
.timestamps[:]
)
data_one = one.load_dataset(eid, "_ibl_wheel.timestamps", collection="alf")
check_arrays(data_nwb, data_one, full_check=full_check)

# wheel moves
table = nwbfile.processing["behavior"].data_interfaces["WheelMovementIntervals"][:]

# intervals
data_nwb = table[["start_time", "stop_time"]].values
data_one = one.load_dataset(eid, "_ibl_wheelMoves.intervals", collection="alf")
check_arrays(data_nwb, data_one, full_check=full_check)

# peak amplitude
data_nwb = table["peak_amplitude"].values
data_one = one.load_dataset(eid, "_ibl_wheelMoves.peakAmplitude", collection="alf")
check_arrays(data_nwb, data_one, full_check=full_check)


def test_LickInterface(
nwbfile: NWBFile, one: ONE, eid: str, full_check: bool = False, verbose: bool = False, revision: str = None
):
"""read-after-write test for the datainterface `LickInterface`.
TODO DOCME

Args:
nwbfile (NWBFile): nwbfile object.
one (ONE): ONE object.
eid (str): experiment uuid / equivalent to session_id
"""
table = nwbfile.processing["behavior"].data_interfaces["LickTimes"][:]
data_nwb = table["lick_time"].values
data_one = one.load_dataset(eid, "licks.times")
check_arrays(data_nwb, data_one, full_check=full_check)


def test_RoiMotionEnergyInterface(
nwbfile: NWBFile, one: ONE, eid: str, full_check: bool = False, verbose: bool = False, revision: str = None
):
"""read-after-write test for the datainterface `RoiMotionEnergyInterface`.
TODO DOCME

Args:
nwbfile (NWBFile): nwbfile object.
one (ONE): ONE object.
eid (str): experiment uuid / equivalent to session_id
"""
camera_views = ["body", "left", "right"]

for view in camera_views:
# data
data_nwb = nwbfile.processing["behavior"].data_interfaces["%sCameraMotionEnergy" % view.capitalize()].data[:]
data_one = one.load_dataset(eid, "%sCamera.ROIMotionEnergy" % view, collection="alf")
check_arrays(data_nwb, data_one, full_check=full_check)

# timestamps
data_nwb = (
nwbfile.processing["behavior"].data_interfaces["%sCameraMotionEnergy" % view.capitalize()].timestamps[:]
)
data_one = one.load_dataset(eid, "_ibl_%sCamera.times" % view, collection="alf")
check_arrays(data_nwb, data_one, full_check=full_check)


def test_IblPoseEstimationInterface(
nwbfile: NWBFile, one: ONE, eid: str, full_check: bool = False, verbose: bool = False, revision: str = None
):
"""read-after-write test for the datainterface `IblPoseEstimationInterface`.
TODO DOCME

Args:
nwbfile (NWBFile): nwbfile object.
one (ONE): ONE object.
eid (str): experiment uuid / equivalent to session_id
"""

camera_views = ["body", "left", "right"]

for view in camera_views:
nodes = nwbfile.processing["behavior"].data_interfaces["PoseEstimation%sCamera" % view.capitalize()].nodes[:]

for node in nodes:
# x
data_nwb = (
nwbfile.processing["behavior"]
.data_interfaces["PoseEstimation%sCamera" % view.capitalize()]
.pose_estimation_series[node]
.data[:][:, 0]
)
data_one = one.load_dataset(eid, "_ibl_%sCamera.dlc.pqt" % view, collection="alf")["%s_x" % node].values
check_arrays(data_nwb, data_one, full_check=full_check)

# y
data_nwb = (
nwbfile.processing["behavior"]
.data_interfaces["PoseEstimation%sCamera" % view.capitalize()]
.pose_estimation_series[node]
.data[:][:, 1]
)
data_one = one.load_dataset(eid, "_ibl_%sCamera.dlc.pqt" % view, collection="alf")["%s_y" % node].values
check_arrays(data_nwb, data_one, full_check=full_check)

# confidence
data_nwb = (
nwbfile.processing["behavior"]
.data_interfaces["PoseEstimation%sCamera" % view.capitalize()]
.pose_estimation_series[node]
.confidence[:]
)
data_one = one.load_dataset(eid, "_ibl_%sCamera.dlc.pqt" % view, collection="alf")[
"%s_likelihood" % node
].values
check_arrays(data_nwb, data_one, full_check=full_check)

# timestamps
data_nwb = (
nwbfile.processing["behavior"]
.data_interfaces["PoseEstimation%sCamera" % view.capitalize()]
.pose_estimation_series[node]
.timestamps[:]
)
data_one = one.load_dataset(eid, "_ibl_%sCamera.times" % view, collection="alf")
check_arrays(data_nwb, data_one, full_check=full_check)


def test_BrainwideMapTrialsInterface(
nwbfile: NWBFile, one: ONE, eid: str, full_check: bool = False, verbose: bool = False, revision: str = None
):
"""read-after-write test for the datainterface `BrainwideMapTrialsInterface`.
TODO DOCME

Args:
nwbfile (NWBFile): nwbfile object.
one (ONE): ONE object.
eid (str): experiment uuid / equivalent to session_id
"""

data_nwb = nwbfile.trials[:]
data_one = one.load_dataset(eid, "_ibl_trials.table", collection="alf")

naming_map = {
"start_time": "intervals_0",
"stop_time": "intervals_1",
"choice": "choice",
"feedback_type": "feedbackType",
"reward_volume": "rewardVolume",
"contrast_left": "contrastLeft",
"contrast_right": "contrastRight",
"probability_left": "probabilityLeft",
"feedback_time": "feedback_times",
"response_time": "response_times",
# 'stim_off_time': '',
"stim_on_time": "stimOn_times",
"go_cue_time": "goCue_times",
"first_movement_time": "firstMovement_times",
}
naming_map = {v: k for k, v in naming_map.items()}

check_tables(data_one, data_nwb, naming_map=naming_map)


def test_PupilTrackingInterface(
nwbfile: NWBFile, one: ONE, eid: str, full_check: bool = False, verbose: bool = False, revision: str = None
):
"""read-after-write test for the datainterface `PupilTrackingInterface`.
TODO DOCME

Args:
nwbfile (NWBFile): nwbfile object.
one (ONE): ONE object.
eid (str): experiment uuid / equivalent to session_id
"""

camera_views = ["left", "right"]
for view in camera_views:
# raw
data_nwb = (
nwbfile.processing["behavior"]
.data_interfaces["%sPupilTracking" % view.capitalize()]
.time_series["%sRawPupilDiameter" % view.capitalize()]
.data[:]
)
data_one = one.load_dataset(eid, "_ibl_%sCamera.features.pqt" % view, collection="alf")[
"pupilDiameter_raw"
].values

check_arrays(data_nwb, data_one, full_check=full_check)

# smooth
data_nwb = (
nwbfile.processing["behavior"]
.data_interfaces["%sPupilTracking" % view.capitalize()]
.time_series["%sSmoothedPupilDiameter" % view.capitalize()]
.data[:]
)
data_one = one.load_dataset(eid, "_ibl_%sCamera.features.pqt" % view, collection="alf")[
"pupilDiameter_smooth"
].values

check_arrays(data_nwb, data_one, full_check=full_check)


def test_IblSortingInterface(
nwbfile: NWBFile, one: ONE, eid: str, full_check: bool = False, verbose: bool = False, revision: str = None
):
"""_summary_

Args:
nwbfile (_type_): _description_
one (_type_): _description_
eid (_type_): _description_
full_check (bool, optional): _description_. Defaults to False.
revision (_type_, optional): _description_. Defaults to None.

Returns:
_type_: _description_
"""

units_table = nwbfile.units[:]
probe_names = units_table["probe_name"].unique()

if full_check:
inds = units_table.index
else:
inds = units_table.sample(20).index

spike_times = {}
spike_clusters = {}
cluster_uuids = {}

# for fast spike extraction
def get_spikes_for_cluster(spike_clusters, spike_times, cluster):
# requires that spike_times and spike_clusters are sorted
start_ix, stop_ix = np.searchsorted(spike_clusters, [cluster, cluster + 1])
return np.sort(spike_times[start_ix:stop_ix])

# get and prep data once
for probe_name in probe_names:

# include revision TODO FIXME this will likely change - check back in with Miles
if revision is not None:
collection = f"alf/{probe_name}/pykilosort/{revision}"
else:
collection = f"alf/{probe_name}/pykilosort"

spike_times[probe_name] = one.load_dataset(eid, "spikes.times", collection=collection)
spike_clusters[probe_name] = one.load_dataset(eid, "spikes.clusters", collection=collection)
cluster_uuids[probe_name] = one.load_dataset(eid, "clusters.uuids", collection=collection)

# pre-sort for fast access
sort_ix = np.argsort(spike_clusters[probe_name])
spike_clusters[probe_name] = spike_clusters[probe_name][sort_ix]
spike_times[probe_name] = spike_times[probe_name][sort_ix]

for ix in inds:
probe_name = units_table.loc[ix, "probe_name"]
uuid = units_table.loc[ix, "uuid"]
nwb_spike_times = units_table.loc[ix, "spike_times"]

cluster_id = np.where(cluster_uuids[probe_name] == uuid)[0][0]
one_spike_times = get_spikes_for_cluster(spike_clusters[probe_name], spike_times[probe_name], cluster_id)

# more verbose but slower for more than ~20 checks
# one_spike_times = spike_times[probe_name][spike_clusters[probe_name] == cluster_id]

# testing
testing.assert_array_less(np.max((one_spike_times - nwb_spike_times) * 30000), 1)
Loading
Loading