diff --git a/.gitignore b/.gitignore index 5fed3cb..2e13d1e 100644 --- a/.gitignore +++ b/.gitignore @@ -107,8 +107,7 @@ ENV/ .mypy_cache/ # datajoint -dj_local_conf.json -dj_local_conf_old.json +dj_local_con*.json # emacs **/*~ @@ -122,3 +121,6 @@ Diagram.ipynb # vscode .vscode/settings.json + +# notes +temp* \ No newline at end of file diff --git a/Dockerfile.dev b/Dockerfile.dev index 3ebcbb2..ab9614f 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -26,7 +26,7 @@ RUN pip install -e /main/element-session RUN pip install -e /main/element-array-ephys RUN pip install -e /main/workflow-array-ephys RUN pip install -r /main/workflow-array-ephys/requirements_test.txt - + WORKDIR /main/workflow-array-ephys -ENTRYPOINT ["tail", "-f", "/dev/null"] \ No newline at end of file +ENTRYPOINT ["tail", "-f", "/dev/null"] diff --git a/Dockerfile.test b/Dockerfile.test index e336ffe..d19bf6b 100644 --- a/Dockerfile.test +++ b/Dockerfile.test @@ -8,20 +8,32 @@ USER anaconda WORKDIR /main/workflow-array-ephys # Option 1 - Install DataJoint's remote fork of the workflow and elements -RUN git clone https://github.com/datajoint/workflow-array-ephys.git /main/workflow-array-ephys +# RUN git clone https://github.com/datajoint/workflow-array-ephys.git /main/ -# Option 2 - Install user's remote fork of element and workflow +# Option 2 - Install user's remote fork of element and workflow # or an unreleased version of the element +# RUN pip install git+https://github.com/element-lab.git +# RUN pip install git+https://github.com//element-animal.git +# RUN pip install git+https://github.com//element-session.git # RUN pip install git+https://github.com//element-array-ephys.git # RUN git clone https://github.com//workflow-array-ephys.git /main/workflow-array-ephys # Option 3 - Install user's local fork of element and workflow -# RUN mkdir /main/element-array-ephys -# COPY --chown=anaconda:anaconda ./element-array-ephys /main/element-array-ephys -# RUN pip install /main/element-array-ephys -# COPY --chown=anaconda:anaconda ./workflow-array-ephys /main/workflow-array-ephys +RUN mkdir /main/element-lab +COPY --chown=anaconda:anaconda ./element-lab /main/element-lab +RUN pip install -e /main/element-lab +RUN mkdir /main/element-animal +COPY --chown=anaconda:anaconda ./element-animal /main/element-animal +RUN pip install -e /main/element-animal +RUN mkdir /main/element-session +COPY --chown=anaconda:anaconda ./element-session /main/element-session +RUN pip install -e /main/element-session +RUN mkdir /main/element-array-ephys +COPY --chown=anaconda:anaconda ./element-array-ephys /main/element-array-ephys +RUN pip install -e /main/element-array-ephys +COPY --chown=anaconda:anaconda ./workflow-array-ephys /main/workflow-array-ephys # RUN rm -f /main/workflow-array-ephys/dj_local_conf.json # Install the workflow RUN pip install /main/workflow-array-ephys -RUN pip install -r /main/workflow-array-ephys/requirements_test.txt \ No newline at end of file +RUN pip install -r /main/workflow-array-ephys/requirements_test.txt diff --git a/README.md b/README.md index 4c84167..9834127 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # DataJoint Workflow - Array Electrophysiology -Workflow for extracellular array electrophysiology data acquired with a polytrode probe (e.g. -Neuropixels, Neuralynx) using the `SpikeGLX` or `OpenEphys` acquisition software and processed +Workflow for extracellular array electrophysiology data acquired with a polytrode probe (e.g. +Neuropixels, Neuralynx) using the `SpikeGLX` or `OpenEphys` acquisition software and processed with MATLAB- or python-based `Kilosort` spike sorting software. A complete electrophysiology workflow can be built using the DataJoint Elements. @@ -11,17 +11,17 @@ A complete electrophysiology workflow can be built using the DataJoint Elements. + [element-array-ephys](https://github.com/datajoint/element-array-ephys) This repository provides demonstrations for: -1. Set up a workflow using DataJoint Elements (see +1. Set up a workflow using DataJoint Elements (see [workflow_array_ephys/pipeline.py](workflow_array_ephys/pipeline.py)) -2. Ingestion of data/metadata based on a predefined file structure, file naming -convention, and directory lookup methods (see +2. Ingestion of data/metadata based on a predefined file structure, file naming +convention, and directory lookup methods (see [workflow_array_ephys/paths.py](workflow_array_ephys/paths.py)). 3. Ingestion of clustering results. ## Workflow architecture The electrophysiology workflow presented here uses components from 4 DataJoint -Elements (`element-lab`, `element-animal`, `element-session`, +Elements (`element-lab`, `element-animal`, `element-session`, `element-array-ephys`) assembled together to form a fully functional workflow. ### element-lab @@ -40,12 +40,12 @@ https://github.com/datajoint/element-animal/blob/main/images/subject_diagram.svg ## Installation instructions -+ The installation instructions can be found at [datajoint-elements/install.md]( - https://github.com/datajoint/datajoint-elements/blob/main/install.md). ++ The installation instructions can be found at the +[datajoint-elements repository](https://github.com/datajoint/datajoint-elements/blob/main/gh-pages/docs/install.md). ## Interacting with the DataJoint workflow + Please refer to the following workflow-specific -[Jupyter notebooks](/notebooks) for an in-depth explanation of how to run the -workflow ([03-process.ipynb](notebooks/03-process.ipynb)) and explore the data -([05-explore.ipynb](notebooks/05-explore.ipynb)). \ No newline at end of file + [Jupyter notebooks](/notebooks) for an in-depth explanation of how to run the + workflow ([03-process.ipynb](notebooks/03-process.ipynb)) and explore the data + ([05-explore.ipynb](notebooks/05-explore.ipynb)). diff --git a/docker-compose-dev.yaml b/docker-compose-dev.yaml index 6bfb12d..a921aba 100644 --- a/docker-compose-dev.yaml +++ b/docker-compose-dev.yaml @@ -30,4 +30,4 @@ services: db: condition: service_healthy networks: - main: \ No newline at end of file + main: diff --git a/docker-compose-test.yaml b/docker-compose-test.yaml index d894fd5..9d020bb 100644 --- a/docker-compose-test.yaml +++ b/docker-compose-test.yaml @@ -1,4 +1,7 @@ +# export COMPOSE_DOCKER_CLI_BUILD=0 # some machines need for smooth --build +# .env file: TEST_DATA_DIR= # docker-compose -f docker-compose-test.yaml up --build +# docker exec -it workflow-array-ephys_workflow_1 /bin/bash # docker-compose -f docker-compose-test.yaml down version: "2.4" @@ -22,20 +25,25 @@ services: - DJ_HOST=db - DJ_USER=root - DJ_PASS=simple - - EPHYS_ROOT_DATA_DIR=/main/test_data + - EPHYS_ROOT_DATA_DIR=/main/test_data/workflow_ephys_data1/,/main/test_data/workflow_ephys_data2/ - DATABASE_PREFIX=test_ command: - bash - -c - | echo "------ INTEGRATION TESTS ------" - pytest -sv --cov-report term-missing --cov=workflow-array-ephys -p no:warnings + pytest -sv --cov-report term-missing --cov=workflow_array_ephys -p no:warnings tests/ tail -f /dev/null volumes: - ${TEST_DATA_DIR}:/main/test_data - ./apt_requirements.txt:/tmp/apt_requirements.txt + - ../element-lab:/main/element-lab + - ../element-animal:/main/element-animal + - ../element-session:/main/element-session + - ../element-array-ephys:/main/element-array-ephys + - .:/main/workflow-array-ephys depends_on: db: condition: service_healthy networks: - main: \ No newline at end of file + main: diff --git a/requirements.txt b/requirements.txt index 0506252..f5a0419 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ datajoint>=0.13.0 element-array-ephys==0.1.0b0 -element-lab==0.1.0b0 +element-lab>=0.1.0b0 element-animal==0.1.0b0 element-session==0.1.0b0 element-interface @ git+https://github.com/datajoint/element-interface.git -ipykernel==6.0.1 \ No newline at end of file +ipykernel==6.0.1 diff --git a/tests/__init__.py b/tests/__init__.py index 4b5e37c..0777369 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,19 +1,24 @@ -# run tests: pytest -sv --cov-report term-missing --cov=workflow-array-ephys -p no:warnings +# run all tests: pytest -sv --cov-report term-missing \ +# --cov=workflow_array_ephys -p no:warnings tests/ +# run one test, debug: pytest [above options] --pdb tests/tests_name.py -k \ +# function_name import os +import sys import pytest import pandas as pd import pathlib import datajoint as dj -import numpy as np import workflow_array_ephys from workflow_array_ephys.paths import get_ephys_root_data_dir +from element_interface.utils import find_full_path # ------------------- SOME CONSTANTS ------------------- _tear_down = True +verbose = False test_user_data_dir = pathlib.Path('./tests/user_data') test_user_data_dir.mkdir(exist_ok=True) @@ -26,19 +31,32 @@ 'subject5/session1', 'subject6/session1'] +# -------------------- HELPER CLASS -------------------- + + +class QuietStdOut: + """If verbose set to false, used to quiet tear_down table.delete prints""" + def __enter__(self): + self._original_stdout = sys.stdout + sys.stdout = open(os.devnull, 'w') + + def __exit__(self, exc_type, exc_val, exc_tb): + sys.stdout.close() + sys.stdout = self._original_stdout + +# ---------------------- FIXTURES ---------------------- -# ------------------- FIXTURES ------------------- @pytest.fixture(autouse=True) def dj_config(): + """ If dj_local_config exists, load""" if pathlib.Path('./dj_local_conf.json').exists(): dj.config.load('./dj_local_conf.json') dj.config['safemode'] = False - dj.config['custom'] = { 'database.prefix': (os.environ.get('DATABASE_PREFIX') or dj.config['custom']['database.prefix']), - 'ephys_root_data_dir': (os.environ.get('EPHYS_ROOT_DATA_DIR') + 'ephys_root_data_dir': (os.environ.get('EPHYS_ROOT_DATA_DIR').split(',') or dj.config['custom']['ephys_root_data_dir']) } return @@ -46,31 +64,43 @@ def dj_config(): @pytest.fixture(autouse=True) def test_data(dj_config): - test_data_dir = pathlib.Path(dj.config['custom']['ephys_root_data_dir']) - - test_data_exists = np.all([(test_data_dir / p).exists() for p in sessions_dirs]) + """If data does not exist or partial data is present, + attempt download with DJArchive to the first listed root directory""" + test_data_exists = True + for p in sessions_dirs: + try: + find_full_path(get_ephys_root_data_dir(), p) + except FileNotFoundError: + test_data_exists = False # If data not found - if not test_data_exists: + if not test_data_exists: # attempt to djArchive dowload try: dj.config['custom'].update({ - 'djarchive.client.endpoint': os.environ['DJARCHIVE_CLIENT_ENDPOINT'], - 'djarchive.client.bucket': os.environ['DJARCHIVE_CLIENT_BUCKET'], - 'djarchive.client.access_key': os.environ['DJARCHIVE_CLIENT_ACCESSKEY'], - 'djarchive.client.secret_key': os.environ['DJARCHIVE_CLIENT_SECRETKEY'] + 'djarchive.client.endpoint': + os.environ['DJARCHIVE_CLIENT_ENDPOINT'], + 'djarchive.client.bucket': + os.environ['DJARCHIVE_CLIENT_BUCKET'], + 'djarchive.client.access_key': + os.environ['DJARCHIVE_CLIENT_ACCESSKEY'], + 'djarchive.client.secret_key': + os.environ['DJARCHIVE_CLIENT_SECRETKEY'] }) except KeyError as e: raise FileNotFoundError( - f'Test data not available at {test_data_dir}.' + f' Full test data not available.' f'\nAttempting to download from DJArchive,' f' but no credentials found in environment variables.' f'\nError: {str(e)}') import djarchive_client client = djarchive_client.client() - workflow_version = workflow_array_ephys.version.__version__ - client.download('workflow-array-ephys-test-set', - workflow_version.replace('.', '_'), + test_data_dir = get_ephys_root_data_dir() + if isinstance(test_data_dir, list): # if multiple root dirs, first + test_data_dir = test_data_dir[0] + + client.download('workflow-array-ephys-benchmark', + 'v2', str(test_data_dir), create_target=False) return @@ -78,16 +108,17 @@ def test_data(dj_config): @pytest.fixture def pipeline(): from workflow_array_ephys import pipeline - yield {'subject': pipeline.subject, 'lab': pipeline.lab, 'ephys': pipeline.ephys, 'probe': pipeline.probe, 'session': pipeline.session, 'get_ephys_root_data_dir': pipeline.get_ephys_root_data_dir} - - if _tear_down: + if verbose and _tear_down: pipeline.subject.Subject.delete() + elif not verbose and _tear_down: + with QuietStdOut(): + pipeline.subject.Subject.delete() @pytest.fixture @@ -100,34 +131,35 @@ def subjects_csv(): 'subject3', 'subject4', 'subject5', 'subject6'] input_subjects.sex = ['F', 'M', 'M', 'M', 'F', 'F'] - input_subjects.subject_birth_date = ['2020-01-01 00:00:01', '2020-01-01 00:00:01', - '2020-01-01 00:00:01', '2020-01-01 00:00:01', - '2020-01-01 00:00:01', '2020-01-01 00:00:01'] + input_subjects.subject_birth_date = ['2020-01-01 00:00:01', + '2020-01-01 00:00:01', + '2020-01-01 00:00:01', + '2020-01-01 00:00:01', + '2020-01-01 00:00:01', + '2020-01-01 00:00:01'] input_subjects.subject_description = ['dl56', 'SC035', 'SC038', 'oe_talab', 'rich', 'manuel'] input_subjects = input_subjects.set_index('subject') subjects_csv_path = pathlib.Path('./tests/user_data/subjects.csv') - input_subjects.to_csv(subjects_csv_path) # write csv file + input_subjects.to_csv(subjects_csv_path) # write csv file yield input_subjects, subjects_csv_path - subjects_csv_path.unlink() # delete csv file after use + subjects_csv_path.unlink() # delete csv file after use @pytest.fixture def ingest_subjects(pipeline, subjects_csv): from workflow_array_ephys.ingest import ingest_subjects _, subjects_csv_path = subjects_csv - ingest_subjects(subjects_csv_path) + ingest_subjects(subjects_csv_path, verbose=verbose) return @pytest.fixture def sessions_csv(test_data): """ Create a 'sessions.csv' file""" - root_dir = pathlib.Path(get_ephys_root_data_dir()) - input_sessions = pd.DataFrame(columns=['subject', 'session_dir']) input_sessions.subject = ['subject1', 'subject2', 'subject2', 'subject3', 'subject4', 'subject5', @@ -147,18 +179,21 @@ def sessions_csv(test_data): def ingest_sessions(ingest_subjects, sessions_csv): from workflow_array_ephys.ingest import ingest_sessions _, sessions_csv_path = sessions_csv - ingest_sessions(sessions_csv_path) + ingest_sessions(sessions_csv_path, verbose=verbose) return @pytest.fixture def testdata_paths(): + """ Paths for test data 'subjectX/sessionY/probeZ/etc'""" return { 'npx3A-p1-ks': 'subject5/session1/probe_1/ks2.1_01', 'npx3A-p2-ks': 'subject5/session1/probe_2/ks2.1_01', - 'oe_npx3B-ks': 'subject4/experiment1/recording1/continuous/Neuropix-PXI-100.0/ks', + 'oe_npx3B-ks': 'subject4/experiment1/recording1/continuous/' + + 'Neuropix-PXI-100.0/ks', 'sglx_npx3A-p1': 'subject5/session1/probe_1', - 'oe_npx3B': 'subject4/experiment1/recording1/continuous/Neuropix-PXI-100.0', + 'oe_npx3B': 'subject4/experiment1/recording1/continuous/' + + 'Neuropix-PXI-100.0', 'sglx_npx3B-p1': 'subject6/session1/towersTask_g0_imec0', 'npx3B-p1-ks': 'subject6/session1/towersTask_g0_imec0' } @@ -166,6 +201,7 @@ def testdata_paths(): @pytest.fixture def kilosort_paramset(pipeline): + """Insert kilosort parameters into ephys.ClusteringParamset""" ephys = pipeline['ephys'] params_ks = { @@ -193,18 +229,23 @@ def kilosort_paramset(pipeline): "useRAM": 0 } - # doing the insert here as well, since most of the test will require this paramset inserted + # Insert here, since most of the test will require this paramset inserted ephys.ClusteringParamSet.insert_new_params( 'kilosort2', 0, 'Spike sorting using Kilosort2', params_ks) yield params_ks if _tear_down: - (ephys.ClusteringParamSet & 'paramset_idx = 0').delete() + if verbose: + (ephys.ClusteringParamSet & 'paramset_idx = 0').delete() + else: + with QuietStdOut(): + (ephys.ClusteringParamSet & 'paramset_idx = 0').delete() @pytest.fixture def ephys_recordings(pipeline, ingest_sessions): + """Populate ephys.EphysRecording""" ephys = pipeline['ephys'] ephys.EphysRecording.populate() @@ -212,34 +253,43 @@ def ephys_recordings(pipeline, ingest_sessions): yield if _tear_down: - ephys.EphysRecording.delete() + if verbose: + ephys.EphysRecording.delete() + else: + with QuietStdOut(): + ephys.EphysRecording.delete() @pytest.fixture def clustering_tasks(pipeline, kilosort_paramset, ephys_recordings): + """Insert keys from ephys.EphysRecording into ephys.Clustering""" ephys = pipeline['ephys'] - get_ephys_root_data_dir = pipeline['get_ephys_root_data_dir'] - root_dir = pathlib.Path(get_ephys_root_data_dir()) - for ephys_rec_key in (ephys.EphysRecording - ephys.ClusteringTask).fetch('KEY'): - ephys_file = root_dir / (ephys.EphysRecording.EphysFile - & ephys_rec_key).fetch('file_path')[0] + ephys_file_path = pathlib.Path(((ephys.EphysRecording.EphysFile & ephys_rec_key + ).fetch('file_path'))[0]) + ephys_file = find_full_path(get_ephys_root_data_dir(), ephys_file_path) recording_dir = ephys_file.parent kilosort_dir = next(recording_dir.rglob('spike_times.npy')).parent ephys.ClusteringTask.insert1({**ephys_rec_key, 'paramset_idx': 0, - 'clustering_output_dir': kilosort_dir.as_posix()}, - skip_duplicates=True) + 'clustering_output_dir': + kilosort_dir.as_posix() + }, skip_duplicates=True) yield if _tear_down: - ephys.ClusteringTask.delete() + if verbose: + ephys.ClusteringTask.delete() + else: + with QuietStdOut(): + ephys.ClusteringTask.delete() @pytest.fixture def clustering(clustering_tasks, pipeline): + """Populate ephys.Clustering""" ephys = pipeline['ephys'] ephys.Clustering.populate() @@ -247,11 +297,16 @@ def clustering(clustering_tasks, pipeline): yield if _tear_down: - ephys.Clustering.delete() + if verbose: + ephys.Clustering.delete() + else: + with QuietStdOut(): + ephys.Clustering.delete() @pytest.fixture def curations(clustering, pipeline): + """Insert keys from ephys.ClusteringTask into ephys.Curation""" ephys = pipeline['ephys'] for key in (ephys.ClusteringTask - ephys.Curation).fetch('KEY'): @@ -260,4 +315,8 @@ def curations(clustering, pipeline): yield if _tear_down: - ephys.Curation.delete() + if verbose: + ephys.Curation.delete() + else: + with QuietStdOut(): + ephys.Curation.delete() diff --git a/tests/test_export.py b/tests/test_export.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_ingest.py b/tests/test_ingest.py index 5dfba93..e8d1992 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -7,8 +7,14 @@ testdata_paths, kilosort_paramset, ephys_recordings, clustering_tasks, clustering, curations) +# Set all to pass linter warning: PEP8 F811 +__all__ = ['dj_config', 'pipeline', 'test_data', 'subjects_csv', 'ingest_subjects', + 'sessions_csv', 'ingest_sessions', 'testdata_paths', 'kilosort_paramset', + 'ephys_recordings', 'clustering_tasks', 'clustering', 'curations'] + def test_ingest_subjects(pipeline, ingest_subjects): + """ Check number of subjects inserted into the `subject.Subject` table """ subject = pipeline['subject'] assert len(subject.Subject()) == 6 @@ -17,7 +23,6 @@ def test_ingest_sessions(pipeline, sessions_csv, ingest_sessions): ephys = pipeline['ephys'] probe = pipeline['probe'] session = pipeline['session'] - get_ephys_root_data_dir = pipeline['get_ephys_root_data_dir'] assert len(session.Session()) == 7 assert len(probe.Probe()) == 9 @@ -34,50 +39,64 @@ def test_find_valid_full_path(pipeline, sessions_csv): from element_interface.utils import find_full_path get_ephys_root_data_dir = pipeline['get_ephys_root_data_dir'] + ephys_root_data_dir = ([get_ephys_root_data_dir()] + if not isinstance(get_ephys_root_data_dir(), list) + else get_ephys_root_data_dir()) # add more options for root directories - if sys.platform == 'win32': - ephys_root_data_dir = [get_ephys_root_data_dir(), 'J:/', 'M:/'] + if sys.platform == 'win32': # win32 even if Windows 64-bit + ephys_root_data_dir = ephys_root_data_dir + ['J:/', 'M:/'] else: - ephys_root_data_dir = [get_ephys_root_data_dir(), 'mnt/j', 'mnt/m'] + ephys_root_data_dir = ephys_root_data_dir + ['mnt/j', 'mnt/m'] # test: providing relative-path: correctly search for the full-path sessions, _ = sessions_csv sess = sessions.iloc[0] - session_full_path = pathlib.Path(get_ephys_root_data_dir()) / sess.session_dir + session_full_path = find_full_path(ephys_root_data_dir, sess.session_dir) - full_path = find_full_path(ephys_root_data_dir, sess.session_dir) + docker_full_path = pathlib.Path('/main/test_data/workflow_ephys_data1/' + + 'subject1/session1') - assert full_path == session_full_path + assert docker_full_path == session_full_path, str('Session path does not match ' + + 'docker root: ' + + f'{docker_full_path}') def test_find_root_directory(pipeline, sessions_csv): + """ + Test that `find_root_directory` works correctly. + """ from element_interface.utils import find_root_directory get_ephys_root_data_dir = pipeline['get_ephys_root_data_dir'] - + ephys_root_data_dir = ([get_ephys_root_data_dir()] + if not isinstance(get_ephys_root_data_dir(), list) + else get_ephys_root_data_dir()) # add more options for root directories if sys.platform == 'win32': - ephys_root_data_dir = [get_ephys_root_data_dir(), 'J:/', 'M:/'] + ephys_root_data_dir = ephys_root_data_dir + ['J:/', 'M:/'] else: - ephys_root_data_dir = [get_ephys_root_data_dir(), 'mnt/j', 'mnt/m'] + ephys_root_data_dir = ephys_root_data_dir + ['mnt/j', 'mnt/m'] # test: providing full-path: correctly search for the root_dir sessions, _ = sessions_csv sess = sessions.iloc[0] - session_full_path = pathlib.Path(get_ephys_root_data_dir()) / sess.session_dir + # set to /main/, will only work in docker environment + session_full_path = pathlib.Path('/main/test_data/workflow_ephys_data1', + sess.session_dir) root_dir = find_root_directory(ephys_root_data_dir, session_full_path) - assert root_dir.as_posix() == get_ephys_root_data_dir() + assert root_dir.as_posix() == '/main/test_data/workflow_ephys_data1',\ + 'Root path does not match: /main/test_data/workflow_ephys_data1' def test_paramset_insert(kilosort_paramset, pipeline): ephys = pipeline['ephys'] from element_interface.utils import dict_to_uuid - method, desc, paramset_hash = (ephys.ClusteringParamSet & {'paramset_idx': 0}).fetch1( + method, desc, paramset_hash = (ephys.ClusteringParamSet + & {'paramset_idx': 0}).fetch1( 'clustering_method', 'paramset_desc', 'param_set_hash') assert method == 'kilosort2' assert desc == 'Spike sorting using Kilosort2' assert dict_to_uuid(kilosort_paramset) == paramset_hash - diff --git a/tests/test_pipeline_generation.py b/tests/test_pipeline_generation.py index 06cfa1d..f4c3247 100644 --- a/tests/test_pipeline_generation.py +++ b/tests/test_pipeline_generation.py @@ -16,4 +16,5 @@ def test_generate_pipeline(pipeline): session_tbl, probe_tbl = ephys.ProbeInsertion.parents(as_objects=True) assert session_tbl.full_table_name == session.Session.full_table_name assert probe_tbl.full_table_name == probe.Probe.full_table_name - assert 'spike_times' in ephys.CuratedClustering.Unit.heading.secondary_attributes + assert 'spike_times' in (ephys.CuratedClustering.Unit.heading. + secondary_attributes) diff --git a/tests/test_populate.py b/tests/test_populate.py index aaac152..c24350b 100644 --- a/tests/test_populate.py +++ b/tests/test_populate.py @@ -1,5 +1,9 @@ import numpy as np +__all__ = ['dj_config', 'pipeline', 'test_data', 'subjects_csv', 'ingest_subjects', + 'sessions_csv', 'ingest_sessions', 'testdata_paths', 'kilosort_paramset', + 'ephys_recordings', 'clustering_tasks', 'clustering', 'curations'] + from . import (dj_config, pipeline, test_data, subjects_csv, ingest_subjects, sessions_csv, ingest_sessions, @@ -13,10 +17,15 @@ def test_ephys_recording_populate(pipeline, ephys_recordings): def test_LFP_populate_npx3B_OpenEphys(testdata_paths, pipeline, ephys_recordings): + """ + Populate ephys.LFP with OpenEphys items, + recording Neuropixels Phase 3B (Neuropixels 1.0) probe + """ ephys = pipeline['ephys'] rel_path = testdata_paths['oe_npx3B'] rec_key = (ephys.EphysRecording & (ephys.EphysRecording.EphysFile - & f'file_path LIKE "%{rel_path}"')).fetch1('KEY') + & f'file_path LIKE "%{rel_path}"') + ).fetch1('KEY') ephys.LFP.populate(rec_key) lfp_mean = (ephys.LFP & rec_key).fetch1('lfp_mean') @@ -25,18 +34,20 @@ def test_LFP_populate_npx3B_OpenEphys(testdata_paths, pipeline, ephys_recordings electrodes = (ephys.LFP.Electrode & rec_key).fetch('electrode') assert np.array_equal( electrodes, - np.array([5, 14, 23, 32, 41, 50, 59, 68, 77, 86, 95, 104, 113, - 122, 131, 140, 149, 158, 167, 176, 185, 194, 203, 212, 221, 230, - 239, 248, 257, 266, 275, 284, 293, 302, 311, 320, 329, 338, 347, - 356, 365, 374, 383])) + np.array([5, 14, 23, 32, 41, 50, 59, 68, 77, 86, 95, 104, + 113, 122, 131, 140, 149, 158, 167, 176, 185, 194, 203, 212, + 221, 230, 239, 248, 257, 266, 275, 284, 293, 302, 311, 320, + 329, 338, 347, 356, 365, 374, 383])) def test_LFP_populate_npx3A_SpikeGLX(testdata_paths, pipeline, ephys_recordings): + """Populate ephys.LFP with SpikeGLX items, recording Neuropixels Phase 3A probe""" ephys = pipeline['ephys'] rel_path = testdata_paths['sglx_npx3A-p1'] rec_key = (ephys.EphysRecording & (ephys.EphysRecording.EphysFile - & f'file_path LIKE "%{rel_path}%"')).fetch1('KEY') + & f'file_path LIKE "%{rel_path}%"') + ).fetch1('KEY') ephys.LFP.populate(rec_key) lfp_mean = (ephys.LFP & rec_key).fetch1('lfp_mean') @@ -45,18 +56,24 @@ def test_LFP_populate_npx3A_SpikeGLX(testdata_paths, pipeline, ephys_recordings) electrodes = (ephys.LFP.Electrode & rec_key).fetch('electrode') assert np.array_equal( electrodes, - np.array([5, 14, 23, 32, 41, 50, 59, 68, 77, 86, 95, 104, 113, - 122, 131, 140, 149, 158, 167, 176, 185, 194, 203, 212, 221, 230, - 239, 248, 257, 266, 275, 284, 293, 302, 311, 320, 329, 338, 347, - 356, 365, 374, 383])) + np.array([5, 14, 23, 32, 41, 50, 59, 68, 77, 86, 95, 104, + 113, 122, 131, 140, 149, 158, 167, 176, 185, 194, 203, 212, + 221, 230, 239, 248, 257, 266, 275, 284, 293, 302, 311, 320, + 329, 338, 347, 356, 365, 374, 383])) def test_LFP_populate_npx3B_SpikeGLX(testdata_paths, pipeline, ephys_recordings): + """ + Populate ephys.LFP with SpikeGLX items, + recording Neuropixels Phase 3B (Neuropixels 1.0) probe + """ + ephys = pipeline['ephys'] rel_path = testdata_paths['sglx_npx3B-p1'] rec_key = (ephys.EphysRecording & (ephys.EphysRecording.EphysFile - & f'file_path LIKE "%{rel_path}%"')).fetch1('KEY') + & f'file_path LIKE "%{rel_path}%"') + ).fetch1('KEY') ephys.LFP.populate(rec_key) lfp_mean = (ephys.LFP & rec_key).fetch1('lfp_mean') @@ -65,10 +82,10 @@ def test_LFP_populate_npx3B_SpikeGLX(testdata_paths, pipeline, ephys_recordings) electrodes = (ephys.LFP.Electrode & rec_key).fetch('electrode') assert np.array_equal( electrodes, - np.array([5, 14, 23, 32, 41, 50, 59, 68, 77, 86, 95, 104, 113, - 122, 131, 140, 149, 158, 167, 176, 185, 194, 203, 212, 221, 230, - 239, 248, 257, 266, 275, 284, 293, 302, 311, 320, 329, 338, 347, - 356, 365, 374, 383])) + np.array([5, 14, 23, 32, 41, 50, 59, 68, 77, 86, 95, 104, + 113, 122, 131, 140, 149, 158, 167, 176, 185, 194, 203, 212, + 221, 230, 239, 248, 257, 266, 275, 284, 293, 302, 311, 320, + 329, 338, 347, 356, 365, 374, 383])) def test_clustering_populate(clustering, pipeline): @@ -77,50 +94,64 @@ def test_clustering_populate(clustering, pipeline): def test_curated_clustering_populate(curations, pipeline, testdata_paths): + """Populate ephys.CuratedClustering with multiple recordings""" ephys = pipeline['ephys'] rel_path = testdata_paths['npx3A-p1-ks'] - curation_key = (ephys.Curation & f'curation_output_dir LIKE "%{rel_path}"').fetch1('KEY') + curation_key = (ephys.Curation & f'curation_output_dir LIKE "%{rel_path}"' + ).fetch1('KEY') ephys.CuratedClustering.populate(curation_key) assert len(ephys.CuratedClustering.Unit & curation_key & 'cluster_quality_label = "good"') == 76 rel_path = testdata_paths['oe_npx3B-ks'] - curation_key = (ephys.Curation & f'curation_output_dir LIKE "%{rel_path}"').fetch1('KEY') + curation_key = (ephys.Curation & f'curation_output_dir LIKE "%{rel_path}"' + ).fetch1('KEY') ephys.CuratedClustering.populate(curation_key) assert len(ephys.CuratedClustering.Unit & curation_key & 'cluster_quality_label = "good"') == 68 rel_path = testdata_paths['npx3B-p1-ks'] - curation_key = (ephys.Curation & f'curation_output_dir LIKE "%{rel_path}"').fetch1('KEY') + curation_key = (ephys.Curation & f'curation_output_dir LIKE "%{rel_path}"' + ).fetch1('KEY') ephys.CuratedClustering.populate(curation_key) assert len(ephys.CuratedClustering.Unit & curation_key & 'cluster_quality_label = "good"') == 55 def test_waveform_populate_npx3B_OpenEphys(curations, pipeline, testdata_paths): + """ + Populate ephys.WaveformSet with OpenEphys + Neuropixels Phase 3B (Neuropixels 1.0) probe + """ ephys = pipeline['ephys'] - rel_path = testdata_paths['oe_npx3B-ks'] - curation_key = (ephys.Curation & f'curation_output_dir LIKE "%{rel_path}"').fetch1('KEY') + curation_key = (ephys.Curation & f'curation_output_dir LIKE "%{rel_path}"' + ).fetch1('KEY') ephys.CuratedClustering.populate(curation_key) ephys.WaveformSet.populate(curation_key) - waveforms = np.vstack((ephys.WaveformSet.PeakWaveform & curation_key).fetch( - 'peak_electrode_waveform')) + waveforms = np.vstack((ephys.WaveformSet.PeakWaveform & curation_key + ).fetch('peak_electrode_waveform')) assert waveforms.shape == (204, 64) def test_waveform_populate_npx3B_SpikeGLX(curations, pipeline, testdata_paths): + """ + Populate ephys.WaveformSet with SpikeGLX + Neuropixels Phase 3B (Neuropixels 1.0) probe + """ + ephys = pipeline['ephys'] rel_path = testdata_paths['npx3B-p1-ks'] - curation_key = (ephys.Curation & f'curation_output_dir LIKE "%{rel_path}"').fetch1('KEY') + curation_key = (ephys.Curation & f'curation_output_dir LIKE "%{rel_path}"' + ).fetch1('KEY') ephys.CuratedClustering.populate(curation_key) ephys.WaveformSet.populate(curation_key) - waveforms = np.vstack((ephys.WaveformSet.PeakWaveform & curation_key).fetch( - 'peak_electrode_waveform')) + waveforms = np.vstack((ephys.WaveformSet.PeakWaveform + & curation_key).fetch('peak_electrode_waveform')) assert waveforms.shape == (150, 64) diff --git a/workflow_array_ephys/ingest.py b/workflow_array_ephys/ingest.py index 3a909ae..9ced051 100644 --- a/workflow_array_ephys/ingest.py +++ b/workflow_array_ephys/ingest.py @@ -1,90 +1,118 @@ -import re -import pathlib import csv +import re from workflow_array_ephys.pipeline import subject, ephys, probe, session from workflow_array_ephys.paths import get_ephys_root_data_dir from element_array_ephys.readers import spikeglx, openephys -import element_interface.utils +from element_interface.utils import find_root_directory, find_full_path -def ingest_subjects(subject_csv_path='./user_data/subjects.csv'): + +def ingest_subjects(subject_csv_path='./user_data/subjects.csv', verbose=True): + """ + Ingest subjects listed in the subject column of ./user_data/subjects.csv + """ # -------------- Insert new "Subject" -------------- - with open(subject_csv_path, newline= '') as f: + with open(subject_csv_path, newline='') as f: input_subjects = list(csv.DictReader(f, delimiter=',')) - - print(f'\n---- Insert {len(input_subjects)} entry(s) into subject.Subject ----') + if verbose: + previous_length = len(subject.Subject.fetch()) subject.Subject.insert(input_subjects, skip_duplicates=True) - - -def ingest_sessions(session_csv_path='./user_data/sessions.csv'): - root_data_dir = get_ephys_root_data_dir() - + if verbose: + insert_length = len(subject.Subject.fetch()) - previous_length + print(f'\n---- Insert {insert_length} entry(s) into ' + + 'subject.Subject ----') + print('\n---- Successfully completed ingest_subjects ----') + + +def ingest_sessions(session_csv_path='./user_data/sessions.csv', verbose=True): + """ + Ingests SpikeGLX and OpenEphys files from directories listed + in the session_dir column of ./user_data/sessions.csv + """ # ---------- Insert new "Session" and "ProbeInsertion" --------- - with open(session_csv_path, newline= '') as f: + with open(session_csv_path, newline='') as f: input_sessions = list(csv.DictReader(f, delimiter=',')) # Folder structure: root / subject / session / probe / .ap.meta - session_list, session_dir_list, probe_list, probe_insertion_list = [], [], [], [] + session_list, session_dir_list = [], [] + probe_list, probe_insertion_list = [], [] for sess in input_sessions: - session_dir = element_interface.utils.find_full_path( - get_ephys_root_data_dir(), - sess['session_dir']) + session_dir = find_full_path(get_ephys_root_data_dir(), + sess['session_dir']) session_datetimes, insertions = [], [] # search session dir and determine acquisition software - for ephys_pattern, ephys_acq_type in zip(['*.ap.meta', '*.oebin'], ['SpikeGLX', 'OpenEphys']): + for ephys_pattern, ephys_acq_type in zip(['*.ap.meta', '*.oebin'], + ['SpikeGLX', 'OpenEphys']): ephys_meta_filepaths = [fp for fp in session_dir.rglob(ephys_pattern)] if len(ephys_meta_filepaths): acq_software = ephys_acq_type break else: - raise FileNotFoundError(f'Ephys recording data not found! Neither SpikeGLX nor OpenEphys recording files found in: {session_dir}') + raise FileNotFoundError('Ephys recording data not found! Neither SpikeGLX ' + + 'nor OpenEphys recording files found in: ' + + f'{session_dir}') if acq_software == 'SpikeGLX': for meta_filepath in ephys_meta_filepaths: spikeglx_meta = spikeglx.SpikeGLXMeta(meta_filepath) - probe_key = {'probe_type': spikeglx_meta.probe_model, 'probe': spikeglx_meta.probe_SN} - if probe_key['probe'] not in [p['probe'] for p in probe_list] and probe_key not in probe.Probe(): + probe_key = {'probe_type': spikeglx_meta.probe_model, + 'probe': spikeglx_meta.probe_SN} + if (probe_key['probe'] not in [p['probe'] for p in probe_list + ] and probe_key not in probe.Probe()): probe_list.append(probe_key) probe_dir = meta_filepath.parent - probe_number = re.search('(imec)?\d{1}$', probe_dir.name).group() + probe_number = re.search('(imec)?\d{1}$', probe_dir.name + ).group() probe_number = int(probe_number.replace('imec', '')) - insertions.append({'probe': spikeglx_meta.probe_SN, 'insertion_number': int(probe_number)}) + insertions.append({'probe': spikeglx_meta.probe_SN, + 'insertion_number': int(probe_number)}) session_datetimes.append(spikeglx_meta.recording_time) elif acq_software == 'OpenEphys': loaded_oe = openephys.OpenEphys(session_dir) session_datetimes.append(loaded_oe.experiment.datetime) for probe_idx, oe_probe in enumerate(loaded_oe.probes.values()): - probe_key = {'probe_type': oe_probe.probe_model, 'probe': oe_probe.probe_SN} - if probe_key['probe'] not in [p['probe'] for p in probe_list] and probe_key not in probe.Probe(): + probe_key = {'probe_type': oe_probe.probe_model, + 'probe': oe_probe.probe_SN} + if (probe_key['probe'] not in [p['probe'] for p in probe_list + ] and probe_key not in probe.Probe()): probe_list.append(probe_key) - insertions.append({'probe': oe_probe.probe_SN, 'insertion_number': probe_idx}) + insertions.append({'probe': oe_probe.probe_SN, + 'insertion_number': probe_idx}) else: - raise NotImplementedError(f'Unknown acquisition software: {acq_software}') + raise NotImplementedError('Unknown acquisition software: ' + + f'{acq_software}') # new session/probe-insertion - session_key = {'subject': sess['subject'], 'session_datetime': min(session_datetimes)} + session_key = {'subject': sess['subject'], + 'session_datetime': min(session_datetimes)} if session_key not in session.Session(): session_list.append(session_key) - session_dir_list.append({**session_key, 'session_dir': session_dir.relative_to(root_data_dir).as_posix()}) - probe_insertion_list.extend([{**session_key, **insertion} for insertion in insertions]) + root_dir = find_root_directory(get_ephys_root_data_dir(), session_dir) + session_dir_list.append({**session_key, 'session_dir': + session_dir.relative_to(root_dir).as_posix()}) + probe_insertion_list.extend([{**session_key, **insertion + } for insertion in insertions]) - print(f'\n---- Insert {len(session_list)} entry(s) into session.Session ----') session.Session.insert(session_list) session.SessionDirectory.insert(session_dir_list) + if verbose: + print(f'\n---- Insert {len(session_list)} entry(s) into session.Session ----') - print(f'\n---- Insert {len(probe_list)} entry(s) into probe.Probe ----') probe.Probe.insert(probe_list) + if verbose: + print(f'\n---- Insert {len(probe_list)} entry(s) into probe.Probe ----') - print(f'\n---- Insert {len(probe_insertion_list)} entry(s) into ephys.ProbeInsertion ----') ephys.ProbeInsertion.insert(probe_insertion_list) - - print('\n---- Successfully completed workflow_array_ephys/ingest.py ----') + if verbose: + print(f'\n---- Insert {len(probe_insertion_list)} entry(s) into ' + + 'ephys.ProbeInsertion ----') + print('\n---- Successfully completed ingest_subjects ----') if __name__ == '__main__': diff --git a/workflow_array_ephys/paths.py b/workflow_array_ephys/paths.py index c86329b..2c77528 100644 --- a/workflow_array_ephys/paths.py +++ b/workflow_array_ephys/paths.py @@ -2,13 +2,10 @@ def get_ephys_root_data_dir(): - root_data_dirs = dj.config.get('custom', {}).get('ephys_root_data_dir', None) - - return root_data_dirs + return dj.config.get('custom', {}).get('ephys_root_data_dir', None) def get_session_directory(session_key: dict) -> str: from .pipeline import session session_dir = (session.SessionDirectory & session_key).fetch1('session_dir') - - return session_dir \ No newline at end of file + return session_dir diff --git a/workflow_array_ephys/pipeline.py b/workflow_array_ephys/pipeline.py index 11da27c..8975180 100644 --- a/workflow_array_ephys/pipeline.py +++ b/workflow_array_ephys/pipeline.py @@ -16,16 +16,17 @@ db_prefix = dj.config['custom'].get('database.prefix', '') -# Activate "lab", "subject", "session" schema ---------------------------------- +# Activate "lab", "subject", "session" schema --------------------------------- lab.activate(db_prefix + 'lab') subject.activate(db_prefix + 'subject', linking_module=__name__) +Experimenter = lab.User session.activate(db_prefix + 'session', linking_module=__name__) -# Declare table "SkullReference" for use in element-array-ephys ---------------- +# Declare table "SkullReference" for use in element-array-ephys --------------- @lab.schema class SkullReference(dj.Lookup): @@ -35,7 +36,7 @@ class SkullReference(dj.Lookup): contents = zip(['Bregma', 'Lambda']) -# Activate "ephys" schema ------------------------------------------------------ +# Activate "ephys" schema ----------------------------------------------------- ephys.activate(db_prefix + 'ephys', db_prefix + 'probe', diff --git a/workflow_array_ephys/version.py b/workflow_array_ephys/version.py index 97b0c8b..126776b 100644 --- a/workflow_array_ephys/version.py +++ b/workflow_array_ephys/version.py @@ -2,4 +2,4 @@ Package metadata Update the Docker image tag in `docker-compose.yaml` to match """ -__version__ = '0.1.0a4' \ No newline at end of file +__version__ = '0.1.0a4'