diff --git a/Dockerfile b/Dockerfile index 0da6741..6a3352a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,6 +3,7 @@ LABEL maintainer="Stelios Papadopoulos " RUN pip3 install \ meshparty \ + pcg-skel \ cloud-volume \ analysisdatalink\ caveclient \ @@ -14,5 +15,5 @@ RUN mkdir -p .cloudvolume/secrets RUN echo "{\"token\": \"${CLOUDVOLUME_TOKEN:-}\"}" > .cloudvolume/secrets/cave-secret.json COPY . /src/microns-materialization -RUN pip3 install -e /src/microns-materialization/python/microns-materialization -RUN pip3 install -e /src/microns-materialization/python/microns-materialization-api \ No newline at end of file +RUN pip3 install --prefix=$(python -m site --user-base) -e /src/microns-materialization/python/microns-materialization +RUN pip3 install --prefix=$(python -m site --user-base) -e /src/microns-materialization/python/microns-materialization-api \ No newline at end of file diff --git a/deploy/docker-compose.yml b/deploy/docker-compose.yml index 186b43d..69ad199 100644 --- a/deploy/docker-compose.yml +++ b/deploy/docker-compose.yml @@ -15,6 +15,7 @@ x-shared: &common - /mnt:/mnt env_file: - .env + container_name: "microns-materialization" services: notebook: diff --git a/deploy/kubernetes/minnie65_download_materialization.py b/deploy/kubernetes/minnie65_download_materialization.py new file mode 100644 index 0000000..297fed8 --- /dev/null +++ b/deploy/kubernetes/minnie65_download_materialization.py @@ -0,0 +1,6 @@ +import os + +if __name__ == 'main': + from microns_materialization.minnie_materialization.minnie65_materialization import download_materialization + download_materialization(ver=int(os.getenv('MICRONS_MAT_VER_TO_DL')), download_meshes=True, download_synapses=True, loglevel=os.getenv('MICRONS_LOGLEVEL')) + diff --git a/deploy/kubernetes/minnie65_download_meshes.py b/deploy/kubernetes/minnie65_download_meshes.py new file mode 100644 index 0000000..9a0d4d1 --- /dev/null +++ b/deploy/kubernetes/minnie65_download_meshes.py @@ -0,0 +1,5 @@ +import os + +if __name__ == 'main': + from microns_materialization.minnie_materialization.minnie65_materialization import download_materialization + download_materialization(ver=int(os.getenv('MICRONS_MAT_VER_TO_DL')), download_meshes=True, download_synapses=False, loglevel=os.getenv('MICRONS_LOGLEVEL')) \ No newline at end of file diff --git a/deploy/kubernetes/minnie65_download_meshwork_objs.py b/deploy/kubernetes/minnie65_download_meshwork_objs.py new file mode 100644 index 0000000..ebd1e69 --- /dev/null +++ b/deploy/kubernetes/minnie65_download_meshwork_objs.py @@ -0,0 +1,5 @@ +import os + +if __name__ == 'main': + from microns_materialization.minnie_materialization.minnie65_materialization import Queue, download_meshwork_objects + download_meshwork_objects(Queue.PCGMeshwork, loglevel=os.getenv('MICRONS_LOGLEVEL')) \ No newline at end of file diff --git a/deploy/kubernetes/minnie65_download_pcg_skeletons.py b/deploy/kubernetes/minnie65_download_pcg_skeletons.py new file mode 100644 index 0000000..2426bab --- /dev/null +++ b/deploy/kubernetes/minnie65_download_pcg_skeletons.py @@ -0,0 +1,5 @@ +import os + +if __name__ == 'main': + from microns_materialization.minnie_materialization.minnie65_materialization import Queue, download_pcg_skeletons + download_pcg_skeletons(Queue.PCGSkeleton, loglevel=os.getenv('MICRONS_LOGLEVEL')) \ No newline at end of file diff --git a/deploy/kubernetes/minnie65_download_synapses.py b/deploy/kubernetes/minnie65_download_synapses.py new file mode 100644 index 0000000..eeb3641 --- /dev/null +++ b/deploy/kubernetes/minnie65_download_synapses.py @@ -0,0 +1,5 @@ +import os + +if __name__ == 'main': + from microns_materialization.minnie_materialization.minnie65_materialization import download_materialization + download_materialization(ver=int(os.getenv('MICRONS_MAT_VER_TO_DL')), download_meshes=False, download_synapses=True, loglevel=os.getenv('MICRONS_LOGLEVEL')) \ No newline at end of file diff --git a/python/microns-materialization-api/microns_materialization_api/config/__init__.py b/python/microns-materialization-api/microns_materialization_api/config/__init__.py index 4be6ebb..ecb3a1b 100644 --- a/python/microns-materialization-api/microns_materialization_api/config/__init__.py +++ b/python/microns-materialization-api/microns_materialization_api/config/__init__.py @@ -17,7 +17,7 @@ minnie65_materialization_config = SchemaConfig( module_name='minnie65_materialization', - schema_name='microns_minnie65_materialization', + schema_name='microns_minnie65_materialization_v3', externals=externals.minnie65_materialization, adapters=adapters.minnie65_materialization ) diff --git a/python/microns-materialization-api/microns_materialization_api/config/adapters.py b/python/microns-materialization-api/microns_materialization_api/config/adapters.py index f8cd127..bebe813 100644 --- a/python/microns-materialization-api/microns_materialization_api/config/adapters.py +++ b/python/microns-materialization-api/microns_materialization_api/config/adapters.py @@ -2,54 +2,53 @@ Adapters for DataJoint tables. """ -import datajoint as dj -import numpy as np +import json + import h5py -import os import trimesh - -from collections import namedtuple +from meshparty import meshwork, skeleton +from microns_utils.adapter_utils import FilePathAdapter, adapt_mesh_hdf5 -class MeshAdapter(dj.AttributeAdapter): - # Initialize the correct attribute type (allows for use with multiple stores) - def __init__(self, attribute_type): - self.attribute_type = attribute_type - super().__init__() - - attribute_type = '' # this is how the attribute will be declared +class TrimeshAdapter(FilePathAdapter): + def get(self, filepath): + filepath = super().get(filepath) + mesh = adapt_mesh_hdf5(filepath, parse_filepath_stem=False, return_type='namedtuple') + return trimesh.Trimesh(vertices=mesh.vertices, faces=mesh.faces) - TriangularMesh = namedtuple('TriangularMesh', ['segment_id', 'vertices', 'faces']) - - def put(self, filepath): - # save the filepath to the mesh - filepath = os.path.abspath(filepath) - assert os.path.exists(filepath) - return filepath +class MeshworkAdapter(FilePathAdapter): def get(self, filepath): - # access the h5 file and return a mesh - assert os.path.exists(filepath) + filepath = super().get(filepath) + return meshwork.load_meshwork(filepath) - with h5py.File(filepath, 'r') as hf: - vertices = hf['vertices'][()].astype(np.float64) - faces = hf['faces'][()].reshape(-1, 3).astype(np.uint32) - - segment_id = os.path.splitext(os.path.basename(filepath))[0] - - return trimesh.Trimesh(vertices = vertices,faces=faces) +class PCGSkelAdapter(FilePathAdapter): + def get(self, filepath): + filepath = super().get(filepath) + with h5py.File(filepath, 'r') as f: + vertices = f['vertices'][()] + edges = f['edges'][()] + mesh_to_skel_map = f['mesh_to_skel_map'][()] + root = f['root'][()] + meta = json.loads(f['meta'][()]) + skel = skeleton.Skeleton(vertices=vertices, edges=edges, mesh_to_skel_map=mesh_to_skel_map, root=root, meta=meta) + return skel + +# M65 +minnie65_meshes = TrimeshAdapter('filepath@minnie65_meshes') +minnie65_meshwork = MeshworkAdapter('filepath@minnie65_meshwork') +minnie65_pcg_skeletons = PCGSkelAdapter('filepath@minnie65_pcg_skeletons') -# instantiate for use as a datajoint type -h01_meshes = MeshAdapter('filepath@h01_meshes') -minnie65_meshes = MeshAdapter('filepath@minnie65_meshes') +minnie65_materialization = { + 'minnie65_meshes': minnie65_meshes, + 'minnie65_meshwork': minnie65_meshwork, + 'minnie65_pcg_skeletons': minnie65_pcg_skeletons +} -# also store in one object for ease of use with virtual modules +# H01 +h01_meshes = TrimeshAdapter('filepath@h01_meshes') h01_materialization = { 'h01_meshes': h01_meshes, } - -minnie65_materialization = { - 'minnie65_meshes': minnie65_meshes, -} diff --git a/python/microns-materialization-api/microns_materialization_api/config/externals.py b/python/microns-materialization-api/microns_materialization_api/config/externals.py index e1422ea..93b9131 100644 --- a/python/microns-materialization-api/microns_materialization_api/config/externals.py +++ b/python/microns-materialization-api/microns_materialization_api/config/externals.py @@ -1,19 +1,26 @@ """ Externals for DataJoint tables. """ -import datajoint.datajoint_plus as djp from pathlib import Path +import datajoint.datajoint_plus as djp + base_path = Path() / '/mnt' / 'dj-stor01' / 'microns' #h01 materialization h01_materialization_external_meshes_path = base_path / 'h01' / 'meshes' + h01_materialization = { 'h01_meshes': djp.make_store_dict(h01_materialization_external_meshes_path), } #minnie65_materialization -minnie65_materialization_external_meshes_path = base_path / 'minnie' / 'meshes' +minnie65_materialization_external_meshes_path = base_path / 'minnie65' / 'meshes' +minnie65_materialization_external_meshwork_path = base_path / 'minnie65' / 'meshwork' +minnie65_materialization_external_pcg_skeletons_path = base_path / 'minnie65' / 'pcg_skeletons' + minnie65_materialization = { 'minnie65_meshes': djp.make_store_dict(minnie65_materialization_external_meshes_path), + 'minnie65_meshwork': djp.make_store_dict(minnie65_materialization_external_meshwork_path), + 'minnie65_pcg_skeletons': djp.make_store_dict(minnie65_materialization_external_pcg_skeletons_path), } diff --git a/python/microns-materialization-api/microns_materialization_api/schemas/minnie65_materialization.py b/python/microns-materialization-api/microns_materialization_api/schemas/minnie65_materialization.py index 337b599..7be62bb 100644 --- a/python/microns-materialization-api/microns_materialization_api/schemas/minnie65_materialization.py +++ b/python/microns-materialization-api/microns_materialization_api/schemas/minnie65_materialization.py @@ -2,256 +2,490 @@ DataJoint tables for importing minnie65 from CAVE. """ import datajoint as dj -import datajoint.datajoint_plus as djp +import datajoint_plus as djp +from microns_utils.misc_utils import classproperty from ..config import minnie65_materialization_config as config config.register_externals() config.register_adapters(context=locals()) -schema = dj.schema(config.schema_name, create_schema=True) +schema = djp.schema(config.schema_name, create_schema=True) + @schema -class Materialization(djp.Manual): +class ImportMethod(djp.Lookup): + hash_name = 'import_method' definition = """ - # version and timestamp of Minnie65 materialization - ver : DECIMAL(6,2) # materialization version - --- - valid=1 : tinyint # marks whether the materialization is valid. Defaults to 1. - timestamp : timestamp # marks the time at which the materialization service was started + import_method : varchar(8) # import method hash """ - - class CurrentVersion(djp.Part): + + class MaterializationVer(djp.Part): + enable_hashing = True + hash_name = 'import_method' + hashed_attrs = ['caveclient_version', 'datastack', 'ver'] + definition = """ + -> master + --- + caveclient_version: varchar(48) # version of caveclient installed when method was created + datastack: varchar(250) # name of datastack + ver: smallint # client materialization version + ts_inserted=CURRENT_TIMESTAMP : timestamp + """ + + class NucleusSegment(djp.Part): + enable_hashing = True + hash_name = 'import_method' + hashed_attrs = 'caveclient_version', 'datastack', 'ver' definition = """ - # version and timestamp of Minnie65 materialization - kind : varchar(16) # selection criteria for setting the current materialization + -> master + --- + caveclient_version: varchar(48) # version of caveclient installed when method was created + datastack: varchar(250) # name of datastack + ver: smallint # client materialization version + ts_inserted=CURRENT_TIMESTAMP : timestamp + """ + + class MeshPartyMesh(djp.Part): + enable_hashing = True + hash_name = 'import_method' + hashed_attrs = 'meshparty_version', 'caveclient_version', 'datastack', 'cloudvolume_version', 'cloudvolume_path', 'download_meshes_kwargs', 'target_dir' + definition = """ + -> master + --- + description : varchar(1000) # details + meshparty_version: varchar(48) # version of meshparty installed when method was created + caveclient_version: varchar(48) # version of caveclient installed when method was created + datastack: varchar(250) # name of datastack + cloudvolume_version: varchar(48) # version of cloudvolume installed when method was created + cloudvolume_path: varchar(250) # cloudvolume path used to download meshes + download_meshes_kwargs: varchar(1000) # JSON array passed to meshparty.trimesh_io.download_meshes. Note: use json.loads to recover dict. + target_dir: varchar(1000) # target directory for mesh files + ts_inserted=CURRENT_TIMESTAMP : timestamp + """ + + class MeshPartyMesh2(djp.Part): + enable_hashing = True + hash_name = 'import_method' + hashed_attrs = 'meshparty_version', 'caveclient_version', 'datastack', 'ver', 'cloudvolume_version', 'cloudvolume_path', 'download_meshes_kwargs', 'target_dir' + definition = """ + -> master --- + description : varchar(1000) # details + meshparty_version: varchar(48) # version of meshparty installed when method was created + caveclient_version: varchar(48) # version of caveclient installed when method was created + datastack: varchar(250) # name of datastack + ver: smallint # client materialization version + cloudvolume_version: varchar(48) # version of cloudvolume installed when method was created + cloudvolume_path: varchar(250) # cloudvolume path used to download meshes + download_meshes_kwargs: varchar(1000) # JSON array passed to meshparty.trimesh_io.download_meshes. Note: use json.loads to recover dict. + target_dir: varchar(1000) # target directory for mesh files + ts_inserted=CURRENT_TIMESTAMP : timestamp + """ + + class Synapse(djp.Part): + enable_hashing = True + hash_name = 'import_method' + hashed_attrs = 'caveclient_version', 'datastack' + definition = """ -> master - description=NULL : varchar(256) # description of the materialization and source - """ + --- + caveclient_version: varchar(48) # version of caveclient installed when method was created + datastack: varchar(250) # name of datastack + ts_inserted=CURRENT_TIMESTAMP : timestamp + """ - class Meta(djp.Part): + class Synapse2(djp.Part): + enable_hashing = True + hash_name = 'import_method' + hashed_attrs = 'caveclient_version', 'datastack', 'ver' definition = """ -> master --- - description=NULL : varchar(256) # description of the materialization and source - ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp that data was inserted into this DataJoint table. + caveclient_version: varchar(48) # version of caveclient installed when method was created + datastack: varchar(250) # name of datastack + ver: smallint # client materialization version + ts_inserted=CURRENT_TIMESTAMP : timestamp """ + class PCGMeshwork(djp.Part): + enable_hashing = True + hash_name = 'import_method' + hashed_attrs = 'meshparty_version', 'pcg_skel_version', 'caveclient_version', 'datastack', 'ver', 'synapse_table', 'nucleus_table', 'cloudvolume_version', 'cloudvolume_path', 'pcg_meshwork_params', 'target_dir' + definition = """ + -> master + --- + description=NULL : varchar(1000) # details + meshparty_version: varchar(48) # version of meshparty installed when method was created + pcg_skel_version: varchar(48) # version of pcg_skel installed when method was created + caveclient_version: varchar(48) # version of caveclient installed when method was created + datastack: varchar(250) # name of datastack + ver: smallint # client materialization version + synapse_table : varchar(250) # synapse table in CAVEclient annotation service + nucleus_table : varchar(250) # nucleus table in CAVEclient annotation service + cloudvolume_version: varchar(48) # version of cloudvolume installed when method was created + cloudvolume_path: varchar(250) # cloudvolume path + pcg_meshwork_params: varchar(1000) # JSON array passed to pcg_skel.pcg_meshwork. Note: use json.loads to recover dict. + target_dir: varchar(1000) # target directory for file + ts_inserted=CURRENT_TIMESTAMP : timestamp + """ -stable = Materialization.CurrentVersion & {'kind': 'stable'} -latest = Materialization.CurrentVersion & {'kind': 'latest'} -MICrONS_Mar_2021 = Materialization.CurrentVersion & {'kind': 'MICrONS_Mar_2021'} -MICrONS_Jul_2021 = Materialization.CurrentVersion & {'kind': 'MICrONS_Jul_2021'} -releaseJun2021 = Materialization.CurrentVersion & {'kind': 'release_Jun2021'} + class PCGSkeleton(djp.Part): + enable_hashing = True + hash_name = 'import_method' + hashed_attrs = 'meshparty_version', 'pcg_skel_version', 'caveclient_version', 'datastack', 'ver', 'nucleus_table', 'cloudvolume_version', 'cloudvolume_path', 'pcg_skel_params', 'target_dir' + definition = """ + -> master + --- + description=NULL : varchar(1000) # details + meshparty_version: varchar(48) # version of meshparty installed when method was created + pcg_skel_version: varchar(48) # version of pcg_skel installed when method was created + caveclient_version: varchar(48) # version of caveclient installed when method was created + datastack: varchar(250) # name of datastack + ver: smallint # client materialization version + nucleus_table : varchar(250) # nucleus table in CAVEclient annotation service + cloudvolume_version: varchar(48) # version of cloudvolume installed when method was created + cloudvolume_path: varchar(250) # cloudvolume path + pcg_skel_params: varchar(1000) # JSON array passed to pcg_skel.pcg_skel_params. Note: use json.loads to recover dict. + target_dir: varchar(1000) # target directory for file + ts_inserted=CURRENT_TIMESTAMP : timestamp + """ @schema -class Nucleus(djp.Manual): +class Materialization(djp.Lookup): definition = """ - # Nucleus detection from version 0 of the Allen Institute. Table name: 'nucleus_detection_v0' - nucleus_id : int unsigned # id of nucleus from the flat segmentation Equivalent to Allen: 'id'. - --- + ver : decimal(6,2) # materialization version """ - + class Info(djp.Part): + store = True definition = """ - # Detailed information from each nucleus_id - -> Materialization - -> master - segment_id : bigint unsigned # id of the segment under the nucleus centroid. Equivalent to Allen 'pt_root_id'. + -> master --- - nucleus_x : int unsigned # x coordinate of nucleus centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - nucleus_y : int unsigned # y coordinate of nucleus centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - nucleus_z : int unsigned # z coordinate of nucleus centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - supervoxel_id : bigint unsigned # id of the supervoxel under the nucleus centroid. Equivalent to Allen: 'pt_supervoxel_id'. - volume=NULL : float # volume of the nucleus in um^3 + datastack : varchar(48) # datastack name from CAVE metadata + id=NULL : int # materialization ID from CAVE metadata + valid : tinyint # 1 if valid + expires_on=NULL : varchar(32) # date materialization expires from CAVE client (stored as varchar in SQL) + time_stamp : varchar(32) # timestamp materialization was generated (stored as varchar in SQL) + ts_inserted=CURRENT_TIMESTAMP : timestamp """ - - class Meta(djp.Part): + + @classproperty + def with_timestamps(cls): + return cls.proj(..., expires_on='str_to_date(expires_on, "%%Y-%%m-%%d %%H:%%i:%%s")', time_stamp='str_to_date(time_stamp, "%%Y-%%m-%%d %%H:%%i:%%s")') + + class Checkpoint(djp.Part): definition = """ - ->Materialization + name : varchar(48) # name of checkpoint --- - description=NULL : varchar(256) # description of the table version - ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp that data was inserted into this DataJoint table. + -> master + description=NULL : varchar(450) # details """ + class MatV1(djp.Part, dj.Computed): + maker = True + definition = """ + -> master + -> dj.create_virtual_module('mat_v1', 'microns_minnie65_materialization').Materialization + -> Materialization.Info + --- + valid=1 : tinyint # marks whether the materialization is valid. Defaults to 1. + description=null : varchar(256) # description of the materialization and source + timestamp : timestamp # marks the time at which the materialization service was started + """ + + class CAVE(djp.Part, dj.Computed): + maker = True + definition = """ + -> master + -> ImportMethod + -> Materialization.Info + --- + ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp inserted + """ + + @classproperty + def latest(cls): + return cls.aggr_max('ver') + + @classproperty + def available(cls): + return cls & (cls.Info.with_timestamps.proj(diff='datediff(expires_on, current_timestamp)') & 'diff > 0') + + @classproperty + def expired(cls): + return cls & (cls.Info.with_timestamps.proj(diff='datediff(expires_on, current_timestamp)') & 'diff < 0') + + @classproperty + def long_term_support(cls): + return cls & (cls.Info.with_timestamps.proj(diff='datediff(expires_on, time_stamp)') & 'diff >= 30') @schema -class FunctionalCoreg(djp.Manual): +class Nucleus(djp.Lookup): definition = """ - # ID's of cells from the table 'functional_coreg' - ->Materialization - ->Nucleus - segment_id : bigint unsigned # id of the segment under the nucleus centroid. Equivalent to Allen 'pt_root_id'. - scan_session : smallint # session index for the mouse - scan_idx : smallint # number of TIFF stack file - unit_id : int # unit id from ScanSet.Unit - --- - centroid_x : int unsigned # x coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - centroid_y : int unsigned # y coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - centroid_z : int unsigned # z coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - supervoxel_id : bigint unsigned # id of the supervoxel under the nucleus centroid. Equivalent to Allen: 'pt_supervoxel_id'. + nucleus_id : int unsigned # id of segmented nucleus. """ - - class Meta(djp.Part): + + class Info(djp.Part): + definition = """ + -> Materialization + -> master + segment_id : bigint unsigned # id of the segment under the nucleus centroid. Equivalent to Allen 'pt_root_id'. + --- + nucleus_x : int unsigned # x coordinate of nucleus centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) + nucleus_y : int unsigned # y coordinate of nucleus centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) + nucleus_z : int unsigned # z coordinate of nucleus centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) + supervoxel_id : bigint unsigned # id of the supervoxel under the nucleus centroid. Equivalent to Allen: 'pt_supervoxel_id'. + volume=null : float # volume of the nucleus in um^3 + ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp inserted + """ + + class MatV1(djp.Part): + definition = """ + -> master + -> dj.create_virtual_module('mat_v1', 'microns_minnie65_materialization').Nucleus.Info + -> master.Info + --- + ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp inserted + """ + + class CAVE(djp.Part, dj.Computed): definition = """ - ->Materialization + -> master + -> ImportMethod + -> master.Info --- - description=NULL : varchar(256) # description of the table version - ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp that data was inserted into this DataJoint table. - """ + ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp inserted + """ + @schema -class ProofreadSegment(djp.Manual): +class Segment(djp.Lookup): definition = """ - # Segment ID's of manually proofread neurons from 'proofreading_functional_coreg_v2' - ->Materialization - segment_id : bigint unsigned # id of the segment under the nucleus centroid. Equivalent to Allen 'pt_root_id'. + segment_id : bigint unsigned # id of the segment under the nucleus centroid. Equivalent to Allen 'pt_root_id'. """ - - class Meta(djp.Part): + + class MatV1(djp.Part): + definition = """ + -> master + -> Nucleus.MatV1 + --- + ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp inserted + """ + + class Nucleus(djp.Part, dj.Computed): definition = """ - ->Materialization + -> master + -> Nucleus.Info --- - description=NULL : varchar(256) # description of the table version - ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp that data was inserted into this DataJoint table. - """ + ts_inserted=CURRENT_TIMESTAMP : timestamp + """ @schema -class ProofreadFunctionalCoregV2(djp.Manual): - definition = """ - # ID's of cells from the table 'proofreading_functional_coreg_v2' - ->Materialization - ->Nucleus - segment_id : bigint unsigned # id of the segment under the nucleus centroid. Equivalent to Allen 'pt_root_id'. - scan_session : smallint # session index for the mouse - scan_idx : smallint # number of TIFF stack file - unit_id : int # unit id from ScanSet.Unit +class Exclusion(djp.Lookup): + enable_hashing = True + hash_name = 'exclusion_hash' + hashed_attrs = ['reason'] + definition = f""" + {hash_name} : varchar(6) --- - centroid_x : int unsigned # x coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - centroid_y : int unsigned # y coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - centroid_z : int unsigned # z coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - supervoxel_id : bigint unsigned # id of the supervoxel under the nucleus centroid. Equivalent to Allen: 'pt_supervoxel_id'. + reason : varchar(48) # reason for exclusion """ - - class Meta(djp.Part): - definition = """ - ->Materialization - --- - description=NULL : varchar(256) # description of the table version - ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp that data was inserted into this DataJoint table. - """ + + contents = [ + {'reason': 'no data'}, + {'reason': 'no synapse data'} + ] @schema -class SynapseSegmentSource(djp.Manual): +class Synapse(djp.Lookup): definition = """ - segment_id : bigint unsigned # id of the segment under the nucleus centroid. Equivalent to Allen 'pt_root_id'. - --- - include=1 : tinyint # 1 if included in synapse source, 0 if excluded from synapse source + synapse_id : bigint unsigned # synapse index within the segmentation """ + + class Info(djp.Part): + definition = """ + # Synapses from the table 'synapses_pni_2' + -> Segment.proj(primary_seg_id='segment_id') + secondary_seg_id : bigint unsigned # id of the segment that is synaptically paired to primary_segment_id. + -> master + --- + prepost : varchar(16) # whether the primary_seg_id is "presyn" or "postsyn" + synapse_x : int unsigned # x coordinate of synapse centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm). From Allen 'ctr_pt_position'. + synapse_y : int unsigned # y coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm). From Allen 'ctr_pt_position'. + synapse_z : int unsigned # z coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm). From Allen 'ctr_pt_position'. + synapse_size : int unsigned # (EM voxels) scaled by (4x4x40) + """ + + class MatV1(djp.Part): + definition = """ + -> master.Info + -> dj.create_virtual_module('mat_v1', 'microns_minnie65_materialization').Synapse + --- + ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp inserted + """ + + class SegmentExclude(djp.Part): + definition = """ + -> Segment.proj(primary_seg_id='segment_id') + -> master + -> Exclusion + --- + ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp inserted + """ + + class CAVE(djp.Part, dj.Computed): + definition = """ + -> master.Info + -> ImportMethod + --- + ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp inserted + """ @schema -class Synapse(djp.Computed): +class Mesh(djp.Lookup): + hash_name = 'mesh_id' definition = """ - # Synapses from the table 'synapses_pni_2' - ->SynapseSegmentSource.proj(primary_seg_id='segment_id') - secondary_seg_id : bigint unsigned # id of the segment that is synaptically paired to primary_segment_id. - synapse_id : bigint unsigned # synapse index within the segmentation - --- - prepost : varchar(16) # whether the primary_seg_id is "presyn" or "postsyn" - synapse_x : int unsigned # x coordinate of synapse centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm). From Allen 'ctr_pt_position'. - synapse_y : int unsigned # y coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm). From Allen 'ctr_pt_position'. - synapse_z : int unsigned # z coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm). From Allen 'ctr_pt_position'. - synapse_size : int unsigned # (EM voxels) scaled by (4x4x40) - ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp that data was inserted into this DataJoint table. + mesh_id : varchar(12) # unique identifier of a mesh """ + + class Object(djp.Part): + definition = """ + -> master + --- + n_vertices : int unsigned # number of vertices + n_faces : int unsigned # number of faces + mesh : # in-place path to the hdf5 mesh file + ts_inserted=CURRENT_TIMESTAMP : timestamp + """ + + class MeshParty(djp.Part, dj.Computed): + enable_hashing = True + hash_name = 'mesh_id' + hashed_attrs = 'segment_id', 'import_method', 'ts_computed' + definition = """ + -> master + -> master.Object + -> Segment + -> ImportMethod + ts_computed : varchar(128) # timestamp (varchar) that mesh was downloaded/ computed + --- + ts_inserted=CURRENT_TIMESTAMP : timestamp + """ @schema -class AllenV1ColumnTypesSlanted(djp.Manual): +class Meshwork(djp.Lookup): + hash_name = 'meshwork_id' definition = """ - # ID's of cells from the table 'allen_v1_column_types_slanted' - ->Materialization - ->Nucleus - segment_id : bigint unsigned # id of the segment under the nucleus centroid. Equivalent to Allen 'pt_root_id'. - --- - valid : tinyint # 1 = valid entry, 0 = invalid entry - classification_system : varchar(128) # method for classification - cell_type : varchar(128) # manually classified cell type - n_nuc : smallint # number of nuclei associated with segment_id (single somas n_nuc = 1) - centroid_x : int unsigned # x coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - centroid_y : int unsigned # y coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - centroid_z : int unsigned # z coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - supervoxel_id : bigint unsigned # id of the supervoxel under the nucleus centroid. Equivalent to Allen: 'pt_supervoxel_id'. + meshwork_id : varchar(12) # unique identifier of a meshwork object """ - - class Meta(djp.Part): + contents = [[0]] # default id for rows lacking object + + class PCGMeshwork(djp.Part): definition = """ - ->Materialization + -> master --- - description=NULL : varchar(256) # description of the table version - ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp that data was inserted into this DataJoint table. - """ + meshwork_obj : # in-place path to the hdf5 file + """ + + class PCGMeshworkExclude(djp.Part): + definition = """ + -> Segment + -> Exclusion + -> master + ts_computed : varchar(128) # timestamp (varchar) that row was excluded + --- + ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp inserted + """ + + class PCGMeshworkMaker(djp.Part, dj.Computed): + enable_hashing = True + hash_name = 'meshwork_id' + hashed_attrs = 'segment_id', 'import_method', 'ts_computed' + definition = """ + -> master.PCGMeshwork + -> Segment + -> ImportMethod + ts_computed : varchar(128) # timestamp (varchar) that row was downloaded/ computed + --- + ts_inserted=CURRENT_TIMESTAMP : timestamp + """ @schema -class AllenSomaCourseCellClassModelV1(djp.Manual): +class Skeleton(djp.Lookup): + hash_name = 'skeleton_id' definition = """ - # ID's of cells from the table 'allen_soma_coarse_cell_class_model_v1' - ->Materialization - ->Nucleus - segment_id : bigint unsigned # id of the segment under the nucleus centroid. Equivalent to Allen 'pt_root_id'. - --- - valid : tinyint # 1 = valid entry, 0 = invalid entry - classification_system : varchar(128) # method for classification - cell_type : varchar(128) # manually classified cell type - n_nuc : smallint # number of nuclei associated with segment_id (single somas n_nuc = 1) - centroid_x : int unsigned # x coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - centroid_y : int unsigned # y coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - centroid_z : int unsigned # z coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - supervoxel_id : bigint unsigned # id of the supervoxel under the nucleus centroid. Equivalent to Allen: 'pt_supervoxel_id'. + skeleton_id : varchar(12) # unique identifier of a skeleton """ - - class Meta(djp.Part): + contents = [[0]] # default id for rows lacking object + + class PCGSkeleton(djp.Part): definition = """ - ->Materialization + -> master --- - description=NULL : varchar(256) # description of the table version - ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp that data was inserted into this DataJoint table. + skeleton_obj : # in-place path to the hdf5 file """ + class PCGSkeletonMaker(djp.Part, dj.Computed): + enable_hashing = True + hash_name = 'skeleton_id' + hashed_attrs = 'segment_id', 'import_method', 'ts_computed' + definition = """ + -> master.PCGSkeleton + -> Segment + -> ImportMethod + ts_computed : varchar(128) # timestamp (varchar) that row was downloaded/ computed + --- + ts_inserted=CURRENT_TIMESTAMP : timestamp + """ @schema -class AllenSomaCourseCellClassModelV2(djp.Manual): +class Queue(djp.Lookup): + hash_name = 'queue_id' definition = """ - # ID's of cells from the table 'allen_soma_coarse_cell_class_model_v2' - ->Materialization - ->Nucleus - segment_id : bigint unsigned # id of the segment under the nucleus centroid. Equivalent to Allen 'pt_root_id'. + queue_id : varchar(18) # id of queue entry --- - valid : tinyint # 1 = valid entry, 0 = invalid entry - classification_system : varchar(128) # method for classification - cell_type : varchar(128) # manually classified cell type - n_nuc : smallint # number of nuclei associated with segment_id (single somas n_nuc = 1) - centroid_x : int unsigned # x coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - centroid_y : int unsigned # y coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - centroid_z : int unsigned # z coordinate of centroid in EM voxels (x: 4nm, y: 4nm, z: 40nm) - supervoxel_id : bigint unsigned # id of the supervoxel under the nucleus centroid. Equivalent to Allen: 'pt_supervoxel_id'. + ts_inserted=CURRENT_TIMESTAMP : timestamp """ + + class PCGSkeleton(djp.Part): + enable_hashing = True + hash_name = 'queue_id' + hashed_attrs = 'segment_id', 'import_method' + hash_group = True - class Meta(djp.Part): definition = """ - ->Materialization - --- - description=NULL : varchar(256) # description of the table version - ts_inserted=CURRENT_TIMESTAMP : timestamp # timestamp that data was inserted into this DataJoint table. - """ - + -> Segment + -> ImportMethod + -> master + """ + + @classmethod + def add_rows(cls, rows): + cls.insert(rows, insert_to_master=True, ignore_extra_fields=True) + + class PCGMeshwork(djp.Part): + enable_hashing = True + hash_name = 'queue_id' + hashed_attrs = 'segment_id', 'import_method' + hash_group = True + + definition = """ + -> Segment + -> ImportMethod + -> master + """ + + @classmethod + def add_rows(cls, rows): + cls.insert(rows, insert_to_master=True, ignore_extra_fields=True) schema.spawn_missing_classes() -schema.connection.dependencies.load() \ No newline at end of file +schema.connection.dependencies.load() diff --git a/python/microns-materialization/microns_materialization/__init__.py b/python/microns-materialization/microns_materialization/__init__.py index 6667695..e1730cd 100644 --- a/python/microns-materialization/microns_materialization/__init__.py +++ b/python/microns-materialization/microns_materialization/__init__.py @@ -2,6 +2,7 @@ __version__ = version_utils.check_package_version( package='microns-materialization', + prefix='microns-materialization/python', check_if_latest=True, check_if_latest_kwargs=dict( owner='cajal', diff --git a/python/microns-materialization/microns_materialization/minnie_materialization/__init__.py b/python/microns-materialization/microns_materialization/minnie_materialization/__init__.py index 1712eb0..9465067 100644 --- a/python/microns-materialization/microns_materialization/minnie_materialization/__init__.py +++ b/python/microns-materialization/microns_materialization/minnie_materialization/__init__.py @@ -1,4 +1,4 @@ import datajoint.datajoint_plus as djp from . import minnie65_materialization -djp.reassign_master_attribute(minnie65_materialization) +djp.reassign_master_attribute(minnie65_materialization) \ No newline at end of file diff --git a/python/microns-materialization/microns_materialization/minnie_materialization/minnie65_materialization.py b/python/microns-materialization/microns_materialization/minnie_materialization/minnie65_materialization.py index eee0255..21b44e4 100644 --- a/python/microns-materialization/microns_materialization/minnie_materialization/minnie65_materialization.py +++ b/python/microns-materialization/microns_materialization/minnie_materialization/minnie65_materialization.py @@ -1,502 +1,769 @@ -import datajoint as dj -from datajoint import datajoint_plus as djp -from caveclient import CAVEclient - - -import numpy as np -import datetime -import pandas as pd -import traceback +import inspect +import json import sys +from datetime import datetime from pathlib import Path -import time -if 'ipykernel' in sys.modules: - from tqdm import tqdm_notebook as tqdm -else: - from tqdm import tqdm +import datajoint as dj +import datajoint_plus as djp +import numpy as np +import pandas as pd +import pcg_skel +from meshparty import trimesh_io # Schema creation -from microns_materialization_api.schemas import minnie65_materialization as m65mat +from microns_materialization_api.schemas import \ + minnie65_materialization as m65mat -class Materialization(m65mat.Materialization): - - class CurrentVersion(m65mat.Materialization.CurrentVersion): pass - - class Meta(m65mat.Materialization.Meta): pass +schema = m65mat.schema +config = m65mat.config + +logger = djp.getLogger(__name__) + +# Utils +from microns_utils.adapter_utils import adapt_mesh_hdf5 +from microns_utils.ap_utils import set_CAVE_client +from microns_utils.filepath_utils import (append_timestamp_to_filepath, + get_file_modification_time) +from microns_utils.misc_utils import wrap +from microns_utils.version_utils import \ + check_package_version_from_distributions as cpvfd +# TODO: Deal with filter out unrestricted + +class ImportMethod(m65mat.ImportMethod): @classmethod - def fill(cls, client, ver, description=None, update_latest_tag=True): - print('--> Fetching materialization timestamp...') - - # get materialization timestamp - ts = client.materialize.get_version_metadata()['time_stamp'] - - print('Inserting data...') - # insert materialization version and timestamp - cls.insert1({'ver': client.materialize.version, 'timestamp': ts}) - - # insert metadata - Materialization.Meta.insert1({'ver': client.materialize.version, 'description': description}) - print(f'Successfully inserted timestamp: {ts} for materialization version: {client.materialize.version:.2f} with description: "{description}"') - - # Update latest tag - if update_latest_tag: - if ver == -1: - try: - Materialization.CurrentVersion.insert1({'kind': 'latest', 'ver': client.materialize.version}) - print(f'Set Materialization.CurrentVersion "latest" to materialization version {client.materialize.version:.2f}') - - except: - (Materialization.CurrentVersion & 'kind="latest"')._update('ver', client.materialize.version) - print(f'Updated Materialization.CurrentVersion "latest" to materialization version {client.materialize.version:.2f}') - - -stable = m65mat.stable -latest = m65mat.latest -MICrONS_Mar_2021 = m65mat.MICrONS_Mar_2021 -MICrONS_Jul_2021 = m65mat.MICrONS_Jul_2021 -releaseJun2021 = m65mat.releaseJun2021 - -def _version_mgr(client, ver, table=Materialization): - """ Modify the client to the user specified materialization version and check if the version is already in the DataJoint table. - - :param client: CAVEclient object that will be modified by _version_mgr. - - :param ver: User specified materialization version. - - returns: - code : 0 if the version already exists in the table - : 1 if the version does not exist in the table - - client : the client set to the user specified materialization version - """ - code=None - - # set version if desired version is not latest - if ver != -1: - if ver == client.materialize.most_recent_version(): - print(f'Materialization version {ver:.2f} is currently the latest version.') - - else: - client.materialize._version = ver - print(f'Materialization version set to {client.materialize.version:.2f}. This is not the latest version.') + def run(cls, key): + return cls.r1p(key).run(**key) + + @classmethod + def validate_method(cls, names, method_values, current_values): + results = [] + for name, mv, cv in zip(wrap(names), wrap(method_values), wrap(current_values)): + if mv != cv: + cls.Log('error', f"This method requires {name} to be {mv}, but currently is {cv}. Create a new method.") + results.append(0) + else: + results.append(1) + assert np.all(results), 'Method compatibility validation failed. Check logs.' + + class MaterializationVer(m65mat.ImportMethod.MaterializationVer): + @classmethod + def update_method(cls, ver=None, **kwargs): + cls.Log('info', f'Updating method for {cls.class_name}.') + + # DEFAULTS + datastack = 'minnie65_phase3_v1' + + # INSERT + client = set_CAVE_client(datastack, ver) + cls.insert1({ + 'caveclient_version': cpvfd('caveclient'), + 'datastack': datastack, + 'ver': ver if ver is not None else client.materialize.version, + }, ignore_extra_fields=True, insert_to_master=True, skip_duplicates=True) + + def run(self, **kwargs): + params = (self & kwargs).fetch1() + self.Log('info', f'Running {self.class_name} with params {params}.') + + # INITIALIZE & VALIDATE + client = set_CAVE_client(params['datastack'], ver=params['ver']) + self.master.validate_method( + names=('caveclient version', 'datastack', 'materialization_version'), + method_values=(params['caveclient_version'], params['datastack'], params['ver']), + current_values=(cpvfd('caveclient'), client.materialize.datastack_name, client.materialize.version) + ) + + # IMPORT DATA + data = client.materialize.get_version_metadata() + data.update({'ver': data['version']}) + return data - else: - print(f'Checking for new materialization...') + class NucleusSegment(m65mat.ImportMethod.NucleusSegment): + @classmethod + def update_method(cls, ver=None, **kwargs): + cls.Log('info', f'Updating method for {cls.class_name}.') + + # DEFAULTS + datastack = 'minnie65_phase3_v1' + + # INSERT + client = set_CAVE_client(datastack, ver) + cls.insert1({ + 'caveclient_version': cpvfd('caveclient'), + 'datastack': datastack, + 'ver': ver if ver is not None else client.materialize.version, + }, ignore_extra_fields=True, skip_duplicates=True, insert_to_master=True) + + def run(self, **kwargs): + params = (self & kwargs).fetch1() + self.Log('info', f'Running {self.class_name} with params {params}.') + + # INITIALIZE & VALIDATE + client = set_CAVE_client(params['datastack'], ver=params['ver']) + self.master.validate_method( + names=('caveclient version', 'datastack', 'materialization_version'), + method_values=(params['caveclient_version'], params['datastack'], params['ver']), + current_values=(cpvfd('caveclient'), client.materialize.datastack_name, client.materialize.version) + ) + + # IMPORT DATA + df = client.materialize.query_table('nucleus_detection_v0') + rename_dict = { + 'id': 'nucleus_id', + 'pt_root_id': 'segment_id', + 'pt_supervoxel_id': 'supervoxel_id' + } + df = df.rename(columns=rename_dict) + df['ver'] = params['ver'] + df['nucleus_x'], df['nucleus_y'], df['nucleus_z'] = np.stack(df.pt_position.values).T + df['import_method'] = params['import_method'] + return {'df': df} - # check if version exists in table - if len(table.Meta & f'ver={client.materialize.version}') > 0: - code = 0 - print(f'Materialization version {client.materialize.version:.2f} already in schema.') - return code, client + class MeshPartyMesh(m65mat.ImportMethod.MeshPartyMesh): + @classmethod + def update_method(cls, *args, **kwargs): + msg = f'{cls.class_name} has been deprecated. Use {cls.master.class_name}.MeshPartyMesh2.' + cls.Log('error', msg) + raise Exception(msg) + + def run(self, *args, **kwargs): + msg = f'{self.class_name} has been deprecated. Use {self.master.class_name}.MeshPartyMesh2.' + self.Log('error', msg) + raise Exception(msg) + + class MeshPartyMesh2(m65mat.ImportMethod.MeshPartyMesh2): + @classmethod + def update_method(cls, ver=None, download_meshes_kwargs={}, **kwargs): + cls.Log('info', f'Updating method for {cls.class_name}.') + + # DEFAULTS + datastack = 'minnie65_phase3_v1' + download_meshes_kwargs.setdefault('overwrite', False) + download_meshes_kwargs.setdefault('n_threads', 10) + download_meshes_kwargs.setdefault('verbose', False) + download_meshes_kwargs.setdefault('stitch_mesh_chunks', True) + download_meshes_kwargs.setdefault('merge_large_components', False) + download_meshes_kwargs.setdefault('remove_duplicate_vertices', True) + download_meshes_kwargs.setdefault('map_gs_to_https', True) + download_meshes_kwargs.setdefault('fmt', "hdf5") + download_meshes_kwargs.setdefault('save_draco', False) + download_meshes_kwargs.setdefault('chunk_size', None) + download_meshes_kwargs.setdefault('progress', False) + + # INSERT + client = set_CAVE_client(datastack, ver) + cls.insert1( + { + 'description' : '', + 'meshparty_version': cpvfd('meshparty'), + 'cloudvolume_version': cpvfd('cloud-volume'), + 'caveclient_version': cpvfd('caveclient'), + 'datastack': datastack, + 'ver': ver if ver is not None else client.materialize.version, + 'cloudvolume_path': client.info.segmentation_source(), + 'download_meshes_kwargs': json.dumps(download_meshes_kwargs), + 'target_dir': m65mat.config.externals['minnie65_meshes']['location'], + + }, + insert_to_master=True, + skip_duplicates=True, + ) + + def run(self, **kwargs): + params = (self & kwargs).fetch1() + self.Log('info', f'Running {self.class_name} with params {params}.') + + # INITIALIZE & VALIDATE + client = set_CAVE_client(params['datastack'], params['ver']) + packages = { + 'meshparty_version': 'meshparty', + 'caveclient_version': 'caveclient', + 'cloudvolume_version': 'cloud-volume' + } + self.master.validate_method( + names=list(packages.keys()) + ['cloudvolume_path', 'datastack', 'materialization_version'], + method_values=[params[k] for k in packages.keys()] + [params['cloudvolume_path'], params['datastack'], params['ver']], + current_values=[cpvfd(v) for v in packages.values()] + [client.info.segmentation_source(), client.materialize.datastack_name, client.materialize.version] + ) - else: - code = 1 - print(f'Found new materialization version: {client.materialize.version:.2f}') - return code, client + # IMPORT DATA + segment_id = kwargs['segment_id'] + target_dir = params['target_dir'] + trimesh_io.download_meshes(seg_ids=wrap(segment_id), target_dir=target_dir, cv_path=params['cloudvolume_path'], **json.loads(params['download_meshes_kwargs'])) + # make file path + filepath = Path(target_dir).joinpath(str(segment_id)).with_suffix('.h5') -class Nucleus(m65mat.Nucleus): - - class Info(m65mat.Nucleus.Info): pass - - class Meta(m65mat.Nucleus.Meta): pass + # append timestamp to filepath + ts_computed = get_file_modification_time(filepath, timezone='US/Central', fmt="%Y-%m-%d_%H:%M:%S") + filepath = append_timestamp_to_filepath(filepath, ts_computed, return_filepath=True) + + # get mesh data + n_vertices, n_faces, info_dict = adapt_mesh_hdf5(filepath=filepath, parse_filepath_stem=True, filepath_has_timestamp=True, separator='__', as_lengths=True) + assert kwargs['segment_id'] == info_dict['segment_id'], 'segment_id in filepath does not match provided segment_id.' # sanity check + + info_dict['ts_computed'] = str(info_dict.pop('timestamp')) + info_dict['mesh'] = info_dict.pop('filepath') + + return {'n_vertices': n_vertices, 'n_faces': n_faces, **info_dict} - @classmethod - def fetch_df(cls, client): - print(f'Fetching data for materialization version {client.materialize.version:.2f}... ') - # fetch nucleus dataframe - nuc_df = client.materialize.query_table('nucleus_detection_v0') - - # reformat nucleus dataframe - rename_dict = { - 'id': 'nucleus_id', - 'pt_root_id': 'segment_id', - 'pt_supervoxel_id': 'supervoxel_id' - } - - nuc_copy = nuc_df[['id', 'pt_supervoxel_id', 'pt_root_id', 'volume']].copy().rename(columns=rename_dict) - nuc_copy['ver'] = client.materialize.version - nuc_copy['nucleus_x'], nuc_copy['nucleus_y'], nuc_copy['nucleus_z'] = np.stack(nuc_df.pt_position.values).T - return nuc_copy + class Synapse(m65mat.ImportMethod.Synapse): + @classmethod + def update_method(cls, *args, **kwargs): + msg = f'{cls.class_name} has been deprecated. Use {cls.master.class_name}.Synapse2.' + cls.Log('error', msg) + raise Exception(msg) + + def run(self, *args, **kwargs): + msg = f'{self.class_name} has been deprecated. Use {self.master.class_name}.Synapse2.' + self.Log('error', msg) + raise Exception(msg) - @classmethod - def fill_synapse_segment_source(cls): - print('--> Filling SynapseSegmentSource table...') - to_insert = (dj.U('segment_id') & (cls.Info & 'segment_id>0')) - SynapseSegmentSource.proj() - SynapseSegmentSource.insert(to_insert) - print(f'Inserted {len(to_insert)} new segments.') + class Synapse2(m65mat.ImportMethod.Synapse2): + @classmethod + def update_method(cls, ver=None, **kwargs): + cls.Log('info', f'Updating method for {cls.class_name}.') + + # DEFAULTS + datastack = 'minnie65_phase3_v1' + + # INSERT + client = set_CAVE_client(datastack, ver) + cls.insert1({ + 'caveclient_version': cpvfd('caveclient'), + 'datastack': datastack, + 'ver': ver if ver is not None else client.materialize.version, + }, ignore_extra_fields=True, skip_duplicates=True, insert_to_master=True) + + def run(self, **kwargs): + params = (self & kwargs).fetch1() + self.Log('info', f'Running {self.class_name} with params {params}.') + + # INITIALIZE & VALIDATE + client = set_CAVE_client(params['datastack'], ver=params['ver']) + self.master.validate_method( + names=('caveclient version', 'datastack', 'materialization_version'), + method_values=(params['caveclient_version'], params['datastack'], params['ver']), + current_values=(cpvfd('caveclient'), client.materialize.datastack_name, client.materialize.version) + ) + + # IMPORT DATA + primary_seg_id = int(kwargs['primary_seg_id']) + + # get synapses where primary segment is presynaptic + df_pre = client.materialize.query_table('synapses_pni_2', filter_equal_dict={'pre_pt_root_id': primary_seg_id}) + df_pre = df_pre.rename(columns={'pre_pt_root_id':'primary_seg_id', 'post_pt_root_id': 'secondary_seg_id'}) + df_pre['prepost'] = 'presyn' + + # get synapses where primary segment is postsynaptic + df_post = client.materialize.query_table('synapses_pni_2', filter_equal_dict={'post_pt_root_id': primary_seg_id}) + df_post = df_post.rename(columns={'post_pt_root_id':'primary_seg_id', 'pre_pt_root_id': 'secondary_seg_id'}) + df_post['prepost'] = 'postsyn' + + # combine dataframes + df = pd.concat([df_pre, df_post], axis=0) + + # remove autapses (these are mostly errors) + df = df[df['primary_seg_id']!=df['secondary_seg_id']] + + if len(df)>0: + # add synapse_xyz + df['synapse_x'], df['synapse_y'], df['synapse_z'] = np.stack(df['ctr_pt_position'].T, -1) + rename_dict = { + 'id': 'synapse_id', + 'size':'synapse_size', + } + df = df.rename(columns=rename_dict)[['primary_seg_id', 'secondary_seg_id', 'synapse_id', \ + 'prepost', 'synapse_x', 'synapse_y', 'synapse_z', 'synapse_size']] + + df['import_method'] = params['import_method'] + + return {'df': df} + else: + return {'df': []} + + class PCGMeshwork(m65mat.ImportMethod.PCGMeshwork): + @classmethod + def update_method(cls, ver=None, pcg_meshwork_params={}, **kwargs): + cls.Log('info', f'Updating method for {cls.class_name}.') + + # DEFAULTS + datastack = 'minnie65_phase3_v1' + pcg_meshwork_params.setdefault('n_parallel', 10) + pcg_meshwork_params.setdefault('refine', 'all') + pcg_meshwork_params.setdefault('collapse_soma', True) + pcg_meshwork_params.setdefault('synapses', 'all') + pcg_meshwork_params.setdefault('root_point_resolution', [4,4,40]) + + # INSERT + client = set_CAVE_client(datastack, ver) + cls.insert1( + { + 'meshparty_version': cpvfd('meshparty'), + 'caveclient_version': cpvfd('caveclient'), + 'pcg_skel_version': cpvfd('pcg-skel'), + 'datastack': datastack, + 'ver': ver if ver is not None else client.materialize.version, + 'synapse_table': 'synapses_pni_2', + 'nucleus_table': 'nucleus_detection_v0', + 'cloudvolume_version': cpvfd('cloud-volume'), + 'cloudvolume_path': client.info.segmentation_source(), + 'pcg_meshwork_params': json.dumps(pcg_meshwork_params), + 'target_dir': m65mat.config.externals['minnie65_meshwork']['location'], + + }, + insert_to_master=True, + skip_duplicates=True, + ) + + def run(self, **kwargs): + params = (self & kwargs).fetch1() + self.Log('info', f'Running {self.class_name} with params {params}.') + + # INITIALIZE & VALIDATE + client = set_CAVE_client(params['datastack'], ver=params['ver']) + + # validate package dependencies + packages = { + 'meshparty_version': 'meshparty', + 'pcg_skel_version': 'pcg-skel', + 'caveclient_version': 'caveclient', + 'cloudvolume_version': 'cloud-volume' + } - @classmethod - def fill(cls, client, description=None): - print('--> Filling Nucleus table...') - - # fetch data from materialization service - df = cls.fetch_df(client) - - # insert nucleus id to master table - cls.insert(df[['nucleus_id']].to_records(index=False), skip_duplicates=True) + self.master.validate_method( + names=list(packages.keys()) + ['cloudvolume_path', 'datastack', 'materialization_version'], + method_values=[params[k] for k in packages.keys()] + [params['cloudvolume_path'], params['datastack'], params['ver']], + current_values=[cpvfd(v) for v in packages.values()] + [client.info.segmentation_source(), client.materialize.datastack_name, client.materialize.version] + ) + + # IMPORT DATA + segment_id = int(kwargs['segment_id']) + synapse_table = params['synapse_table'] + nucleus_table = params['nucleus_table'] + target_dir = params['target_dir'] + pcg_meshwork_params = json.loads(params['pcg_meshwork_params']) + + # check that segment has synapses + n_presyn = len(client.materialize.query_table(synapse_table, filter_equal_dict={'pre_pt_root_id': segment_id})) + n_postsyn = len(client.materialize.query_table(synapse_table, filter_equal_dict={'pre_pt_root_id': segment_id})) + if not (n_presyn > 0 or n_postsyn > 0): + self.Log('info', f'No synapses found for segment_id {segment_id} in {synapse_table}.') + return {'meshwork_obj': []} + + # check if segment has nucleus + nuc_df = client.materialize.query_table(nucleus_table, filter_equal_dict={'pt_root_id': segment_id}) + if len(nuc_df) > 0: + soma_centroid = nuc_df.pt_position.values[0] + else: + self.Log('info', f'No nucleus found for segment_id {segment_id} in {nucleus_table}.') + soma_centroid = None + + # download meshwork obj + meshwork_obj = pcg_skel.pcg_meshwork( + root_id=segment_id, + client=client, + root_point=soma_centroid, + synapse_table=synapse_table, + **pcg_meshwork_params + ) + + # make file path + filepath = Path(target_dir).joinpath(str(segment_id)).with_suffix('.h5') + + # save meshwork file + meshwork_obj.save_meshwork(filepath) + + # append timestamp to filepath + ts_computed = get_file_modification_time(filepath, timezone='US/Central', fmt="%Y-%m-%d_%H:%M:%S") + filepath = append_timestamp_to_filepath(filepath, ts_computed, return_filepath=True) - print('Inserting data...') - # insert nucleus dataframe to Info - cls.Info.insert(df.to_records(index=False)) - - # insert metadata - cls.Meta.insert1({'ver': client.materialize.version, 'description': description}) - print(f'Successfully inserted nucleus information for materialization version: {client.materialize.version:.2f}') + return { + 'segment_id': segment_id, + 'import_method': params['import_method'], + 'meshwork_obj': filepath, + 'ts_computed': ts_computed + } + class PCGSkeleton(m65mat.ImportMethod.PCGSkeleton): + @classmethod + def update_method(cls, ver=None, pcg_skel_params={}, **kwargs): + cls.Log('info', f'Updating method for {cls.class_name}.') + + # DEFAULTS + datastack = 'minnie65_phase3_v1' + pcg_skel_params.setdefault('n_parallel', 10) + pcg_skel_params.setdefault('refine', 'all') + pcg_skel_params.setdefault('collapse_soma', True) + pcg_skel_params.setdefault('root_point_resolution', [4,4,40]) + + # INSERT + client = set_CAVE_client(datastack, ver) + cls.insert1( + { + 'meshparty_version': cpvfd('meshparty'), + 'caveclient_version': cpvfd('caveclient'), + 'pcg_skel_version': cpvfd('pcg-skel'), + 'datastack': datastack, + 'ver': ver if ver is not None else client.materialize.version, + 'synapse_table': 'synapses_pni_2', + 'nucleus_table': 'nucleus_detection_v0', + 'cloudvolume_version': cpvfd('cloud-volume'), + 'cloudvolume_path': client.info.segmentation_source(), + 'pcg_skel_params': json.dumps(pcg_skel_params), + 'target_dir': m65mat.config.externals['minnie65_pcg_skeletons']['location'], + + }, + insert_to_master=True, + ignore_extra_fields=True, + skip_duplicates=True, + ) + + def run(self, **kwargs): + params = (self & kwargs).fetch1() + self.Log('info', f'Running {self.class_name} with params {params}.') + + # INITIALIZE & VALIDATE + client = set_CAVE_client(params['datastack'], ver=params['ver']) + + # validate package dependencies + packages = { + 'meshparty_version': 'meshparty', + 'pcg_skel_version': 'pcg-skel', + 'caveclient_version': 'caveclient', + 'cloudvolume_version': 'cloud-volume' + } + self.master.validate_method( + names=list(packages.keys()) + ['cloudvolume_path', 'datastack', 'materialization_version'], + method_values=[params[k] for k in packages.keys()] + [params['cloudvolume_path'], params['datastack'], params['ver']], + current_values=[cpvfd(v) for v in packages.values()] + [client.info.segmentation_source(), client.materialize.datastack_name, client.materialize.version] + ) + + # IMPORT DATA + segment_id = int(kwargs['segment_id']) + nucleus_table = params['nucleus_table'] + target_dir = params['target_dir'] + pcg_skel_params = json.loads(params['pcg_skel_params']) + + # check if segment has nucleus + nuc_df = client.materialize.query_table(nucleus_table, filter_equal_dict={'pt_root_id': segment_id}) + if len(nuc_df) > 0: + soma_centroid = nuc_df.pt_position.values[0] + else: + self.Log('info', f'No nucleus found for segment_id {segment_id} in {nucleus_table}.') + soma_centroid = None + + # download skeleton obj + skeleton_obj = pcg_skel.pcg_skeleton( + root_id=segment_id, + client=client, + root_point=soma_centroid, + **pcg_skel_params + ) + + # make file path + filepath = Path(target_dir).joinpath(str(segment_id)).with_suffix('.h5') + + # save skeleton file + skeleton_obj.write_to_h5(filepath) + + # append timestamp to filepath + ts_computed = get_file_modification_time(filepath, timezone='US/Central', fmt="%Y-%m-%d_%H:%M:%S") + filepath = append_timestamp_to_filepath(filepath, ts_computed, return_filepath=True) + + return { + 'segment_id': segment_id, + 'import_method': params['import_method'], + 'skeleton_obj': filepath, + 'ts_computed': ts_computed + } -class FunctionalCoreg(m65mat.FunctionalCoreg): - - class Meta(m65mat.FunctionalCoreg.Meta): pass +class Materialization(m65mat.Materialization): - @classmethod - def fetch_df(cls, client): - print(f'Fetching data for materialization version {client.materialize.version:.2f}... ') - - # fetch dataframes - fc_df = client.materialize.query_table('functional_coreg', filter_out_dict={'pt_root_id': [0]}) - nuc_df = client.materialize.query_table('nucleus_detection_v0', filter_out_dict={'pt_root_id': [0]}) - - # merge proofread and nucleus dataframes - rename_dict = { - 'id_x': 'nucleus_id', - 'pt_root_id': 'segment_id', - 'pt_position_y':'centroid', - 'pt_supervoxel_id_y': 'supervoxel_id', - 'session': 'scan_session' - } - fc_nuc_df_orig = pd.merge(nuc_df, fc_df, on='pt_root_id').rename(columns=rename_dict) - fc_nuc_df = fc_nuc_df_orig[['nucleus_id', 'segment_id', 'supervoxel_id', 'scan_session', 'scan_idx', 'unit_id']].copy() - fc_nuc_df['ver'] = client.materialize.version - fc_nuc_df['centroid_x'], fc_nuc_df['centroid_y'], fc_nuc_df['centroid_z'] = np.stack(fc_nuc_df_orig.centroid.values).T - return fc_nuc_df + class Info(m65mat.Materialization.Info): pass - @classmethod - def fill(cls, client, description=None): - print('--> Filling FunctionalCoreg table...') - - # fetch data from materialization service - df = cls.fetch_df(client) - - print('Inserting data...') - # insert to main table - cls.insert(df.to_records(index=False), skip_duplicates=True) + class Checkpoint(m65mat.Materialization.Checkpoint): + @classmethod + def add_checkpoint(cls, name, ver, description): + cls.insert1({ + 'name': name, + 'ver': ver, + 'description': description + } + ) + + @classmethod + def fill_mat_v1(cls): + cls.configure_logger() + mat_v1 = dj.create_virtual_module('mat_v1', 'microns_minnie65_materialization') + rel = mat_v1.Materialization.CurrentVersion & [{'kind': 'MICrONS_Jul_2021'}, {'kind': 'MICrONS_Mar_2021'}, {'kind': 'release_Jun2021'}] + cls.insert(rel.proj(..., name='kind')) + + class MatV1(m65mat.Materialization.MatV1): + @property + def key_source(self): + return dj.create_virtual_module('mat_v1', 'microns_minnie65_materialization').Materialization + + def get(self, key): + return (self.key_source & key).fetch1() + + def make(self, key): + row = self.get(key) + row.update({'time_stamp': row['timestamp'], 'datastack': 'minnie65_phase3_v1'}) + self.master.insert1(row, ignore_extra_fields=True, skip_duplicates=True) + self.master.Info.insert1(row, ignore_extra_fields=True, skip_duplicates=True) + self.insert1(row, ignore_extra_fields=True, skip_duplicates=True) + + class CAVE(m65mat.Materialization.CAVE): + @property + def key_source(self): + return ImportMethod.MaterializationVer - Materialization - # insert metadata - cls.Meta.insert1({'ver': client.materialize.version, 'description': description}) - print(f'Successfully inserted functional coregistration for materialization version: {client.materialize.version:.2f}') - - -class ProofreadSegment(m65mat.ProofreadSegment): + def make(self, key): + result = {**key, **ImportMethod.run(key)} + Materialization.insert1(result, ignore_extra_fields=True, skip_duplicates=True) + Materialization.Info.insert1(result, ignore_extra_fields=True, skip_duplicates=True) + self.insert1(result, insert_to_master=True, ignore_extra_fields=True, skip_duplicates=True) - class Meta(m65mat.ProofreadSegment): pass - @classmethod - def fetch_df(cls, client): - print(f'Fetching data for materialization version {client.materialize.version:.2f}... ') +class Nucleus(m65mat.Nucleus): + + class Info(m65mat.Nucleus.Info): pass + + class MatV1(m65mat.Nucleus.MatV1): + @classmethod + def fill(cls): + cls.configure_logger() + m65mat_v1 = dj.create_virtual_module('mat_v1', 'microns_minnie65_materialization') + with dj.conn().transaction: + cls.master.insert(m65mat_v1.Nucleus.Info, ignore_extra_fields=True, skip_duplicates=True) + cls.master.Info.insert(m65mat_v1.Nucleus.Info, ignore_extra_fields=True, skip_duplicates=True) + cls.insert(m65mat_v1.Nucleus.Info, ignore_extra_fields=True, skip_duplicates=True) + + class CAVE(m65mat.Nucleus.CAVE): + @property + def key_source(self): + return ImportMethod.NucleusSegment & (Materialization & (Materialization.CAVE - Nucleus.Info.proj())) - # fetch dataframes - fc_df = client.materialize.query_table('proofreading_functional_coreg_v2', filter_out_dict={'pt_root_id': [0]}) - fc_df['ver'] = client.materialize.version - return fc_df[['ver','pt_root_id']].rename(columns={'pt_root_id':'segment_id'}).drop_duplicates().reset_index(drop=True) + def make(self, key): + df = ImportMethod.run(key)['df'] + self.master.insert(df, ignore_extra_fields=True, skip_duplicates=True) + self.master.Info.insert(df, ignore_extra_fields=True, skip_duplicates=True) + self.insert(df, insert_to_master=True, ignore_extra_fields=True, skip_duplicates=True, insert_to_master_kws={'ignore_extra_fields': True, 'skip_duplicates': True}) - @classmethod - def fill(cls, client, description=None): - print('--> Filling ProofreadSegment table...') - - # fetch data from materialization service - df = cls.fetch_df(client) - - print('Inserting data...') - # insert info to main table - cls.insert(df.to_records(index=False), skip_duplicates=True) - - # insert metadata - cls.Meta.insert1({'ver': client.materialize.version, 'description': description}) - print(f'Successfully inserted proofread segments for materialization version: {client.materialize.version:.2f}') +class Segment(m65mat.Segment): -class ProofreadFunctionalCoregV2(m65mat.ProofreadFunctionalCoregV2): - - class Meta(m65mat.ProofreadFunctionalCoregV2.Meta): pass + class MatV1(m65mat.Segment.MatV1): + @classmethod + def fill(cls): + cls.master.insert(Nucleus.MatV1, ignore_extra_fields=True, skip_duplicates=True) + cls.insert(Nucleus.MatV1, ignore_extra_fields=True, skip_duplicates=True) - @classmethod - def fetch_df(cls, client): - print(f'Fetching data for materialization version {client.materialize.version:.2f}... ') + class Nucleus(m65mat.Segment.Nucleus): + @property + def key_source(self): + return Materialization & Nucleus.Info() - # fetch dataframes - fc_df = client.materialize.query_table('proofreading_functional_coreg_v2', filter_out_dict={'pt_root_id': [0]}) - nuc_df = client.materialize.query_table('nucleus_detection_v0', filter_out_dict={'pt_root_id': [0]}) - - # merge proofread and nucleus dataframes - rename_dict = { - 'id_x': 'nucleus_id', - 'pt_root_id': 'segment_id', - 'pt_position_y':'centroid', - 'pt_supervoxel_id_y': 'supervoxel_id', - 'session': 'scan_session' - } - fc_nuc_df_orig = pd.merge(nuc_df, fc_df, on='pt_root_id').rename(columns=rename_dict) - fc_nuc_df = fc_nuc_df_orig[['nucleus_id', 'segment_id', 'supervoxel_id', 'scan_session', 'scan_idx', 'unit_id']].copy() - fc_nuc_df['ver'] = client.materialize.version - fc_nuc_df['centroid_x'], fc_nuc_df['centroid_y'], fc_nuc_df['centroid_z'] = np.stack(fc_nuc_df_orig.centroid.values).T - return fc_nuc_df + def make(self, key): + self.master.insert(Nucleus.Info & key, ignore_extra_fields=True, skip_duplicates=True) + self.insert(Nucleus.Info & key, ignore_extra_fields=True, skip_duplicates=True) -class SynapseSegmentSource(m65mat.SynapseSegmentSource): pass +class Exclusion(m65mat.Exclusion): pass class Synapse(m65mat.Synapse): + + class Info(m65mat.Synapse.Info): pass - client = None - - @property - def key_source(self): - return (SynapseSegmentSource & {'include': 1}).proj(primary_seg_id='segment_id') + class MatV1(m65mat.Synapse.MatV1): + @classmethod + def fill(cls): + m65mat_v1 = dj.create_virtual_module('mat_v1', 'microns_minnie65_materialization') + with dj.conn().transaction: + cls.master.insert(m65mat_v1.Synapse, ignore_extra_fields=True, skip_duplicates=True) + cls.master.Info.insert(m65mat_v1.Synapse, ignore_extra_fields=True, skip_duplicates=True) + cls.insert(m65mat_v1.Synapse, ignore_extra_fields=True, skip_duplicates=True) + + class SegmentExclude(m65mat.Synapse.SegmentExclude): pass + + class CAVE(m65mat.Synapse.CAVE): + @property + def key_source(self): + return (Segment.proj(primary_seg_id='segment_id') - Synapse.Info - Synapse.SegmentExclude) * ImportMethod.Synapse2 + + def make(self, key): + df = ImportMethod.run(key)['df'] + if len(df) > 0: + self.master.insert(df, ignore_extra_fields=True, skip_duplicates=True) + self.master.Info.insert(df, ignore_extra_fields=True, skip_duplicates=True) + self.insert(df, ignore_extra_fields=True) + else: + self.master.SegmentExclude.insert1({'primary_seg_id': key['primary_seg_id'], 'synapse_id': 0, Exclusion.hash_name: Exclusion.hash1({'reason': 'no synapse data'})}, skip_duplicates=True) + + +class Mesh(m65mat.Mesh): - @classmethod - def initialize_client(cls, ver='latest'): - print(f'--> Initializing client...') - - client = CAVEclient('minnie65_phase3_v1') - - if ver == 'latest': - ver = client.materialize.most_recent_version() - print(f'Latest version specified.') - - client.materialize._version = ver - print(f'Version set to: {ver:.2f}') - - cls.client = client - - print(f'Client initialized.') + class Object(m65mat.Mesh.Object): pass - def make(self, key): - primary_seg_id = np.int(key['primary_seg_id']) - - # get synapses where primary segment is presynaptic - df_pre = self.client.materialize.query_table('synapses_pni_2', filter_equal_dict={'pre_pt_root_id': primary_seg_id}) - df_pre = df_pre.rename(columns={'pre_pt_root_id':'primary_seg_id', 'post_pt_root_id': 'secondary_seg_id'}) - df_pre['prepost'] = 'presyn' - - # get synapses where primary segment is postsynaptic - df_post = self.client.materialize.query_table('synapses_pni_2', filter_equal_dict={'post_pt_root_id': primary_seg_id}) - df_post = df_post.rename(columns={'post_pt_root_id':'primary_seg_id', 'pre_pt_root_id': 'secondary_seg_id'}) - df_post['prepost'] = 'postsyn' - - # combine dataframes - df = pd.concat([df_pre, df_post], axis=0) + class MeshParty(m65mat.Mesh.MeshParty): + @property + def key_source(self): + return ((Segment & Segment.Nucleus & 'segment_id!= 0') - Mesh.MeshParty.proj()) * ImportMethod.MeshPartyMesh2 - # remove autapses (these are mostly errors) - df = df[df['primary_seg_id']!=df['secondary_seg_id']] - - if len(df)>0: - # add synapse_xyz - df['synapse_x'], df['synapse_y'], df['synapse_z'] = np.stack(df['ctr_pt_position'].T, -1) + def make(self, key): + result = {**key, **ImportMethod.run(key)} + result = {**{self.hash_name: self.hash1(result)}, **result} + self.master.insert1(result, ignore_extra_fields=True, skip_duplicates=True) + self.master.Object.insert1(result, ignore_extra_fields=True, skip_duplicates=True) + self.insert1(result, insert_to_master=True, skip_hashing=True, ignore_extra_fields=True, skip_duplicates=True, insert_to_master_kws={'ignore_extra_fields': True, 'skip_duplicates': True}) - rename_dict = { - 'id': 'synapse_id', - 'size':'synapse_size', - } - - final_df = df.rename(columns=rename_dict)[['primary_seg_id', 'secondary_seg_id', 'synapse_id', \ - 'prepost', 'synapse_x', 'synapse_y', 'synapse_z', 'synapse_size']] - self.insert(final_df.to_records(index=False)) - - else: - # print(f'No synapses found for primary_seg_id: {primary_seg_id}. Updating SynapseSegmentSource with "include=0".') - (SynapseSegmentSource & {'segment_id': primary_seg_id})._update('include', 0) - - -def fetch_materialization(ver=-1, update_latest_tag=True): - # check that client has an auth token - try: - client = CAVEclient('minnie65_phase3_v1') - except: - traceback.print_exc() - client = CAVEclient() - client.auth.get_new_token() - token = f"{input(prompt='Paste token (without quotes):')}" - client.auth.save_token(token=token, overwrite=True) - print('Checking token...') - - try: - client = CAVEclient('minnie65_phase3_v1') - except: - print('Didnt work. Try running "client.auth.save_token(token="PASTE_TOKEN_HERE", overwrite=True)" inside the notebook.') - return - print('Authentication successful.') - - # handle materialization version - code, client = _version_mgr(client=client, ver=ver) - - if code == 0: - return - - print('Fetching materialization...') - previous_version = (Materialization.CurrentVersion() & {'kind':"latest"}).fetch1('ver') - - try: - # run fill method for tables - Materialization.fill(client=client, ver=ver, update_latest_tag=update_latest_tag) - Nucleus.fill(client=client) - Nucleus.fill_synapse_segment_source() - FunctionalCoreg.fill(client=client) - ProofreadSegment.fill(client=client) - ProofreadFunctionalCoregV2.fill(client=client) - Synapse.initialize_client(ver=client.materialize.version) - print('--> Filling Synapse table with latest segments from SynapseSegmentSource...') - Synapse.populate(display_progress=True, suppress_errors=True) - print('Ensure that no errors occured during Synapse download by checking: "schema.jobs" table') - print('Done.') - - except: - print('The materialization download failed with the following error:') - traceback.print_exc() - print(f'Resetting "latest version" to previous version: {previous_version:.2f}...') - (Materialization.CurrentVersion() & {'kind':"latest"})._update('ver', previous_version) - print(f'Successfully reset "latest version" to {previous_version:.2f}.') - print('Do you want to delete partial data?') - (Materialization & {'ver': client.materialize.version}).delete() - - -## ADDITIONAL TABLES - -class AllenV1ColumnTypesSlanted(m65mat.AllenV1ColumnTypesSlanted): - - class Meta(m65mat.AllenV1ColumnTypesSlanted.Meta): pass - - @classmethod - def fetch_df(cls, client): - print(f'Fetching data for materialization version {client.materialize.version:.2f}... ') - - # fetch dataframes - nuc_df = client.materialize.query_table('nucleus_detection_v0', split_positions=True, filter_out_dict={'pt_root_id': [0]}) - col_df = client.materialize.query_table('allen_v1_column_types_slanted', split_positions=True, filter_out_dict={'pt_root_id': [0]}) - merge_df = col_df.merge(nuc_df, on=['pt_root_id', 'valid']) - - # merge proofread and nucleus dataframes - rename_dict = { - 'id_y': 'nucleus_id', - 'pt_root_id': 'segment_id', - 'pt_position_x_y': 'centroid_x', - 'pt_position_y_y': 'centroid_y', - 'pt_position_z_y': 'centroid_z', - 'pt_supervoxel_id_y': 'supervoxel_id' - } - merge_df2 = merge_df.merge(merge_df.groupby(['pt_root_id'], as_index=False).nunique()[['pt_root_id', 'id_y']].rename(columns={'id_y': 'n_nuc'})) - final_df = merge_df2.rename(columns=rename_dict) - final_df['ver'] = client.materialize.version - final_df['valid'] = 1*(final_df.valid.values == 't') - - return final_df - - @classmethod - def fill(cls, client, description=None): - print('--> Filling AllenV1ColumnTypesSlanted table...') - - # fetch data from materialization service - df = cls.fetch_df(client) +class Meshwork(m65mat.Meshwork): + + class PCGMeshwork(m65mat.Meshwork.PCGMeshwork): pass + + class PCGMeshworkExclude(m65mat.Meshwork.PCGMeshworkExclude): pass - print('Inserting data...') - # insert info to main table - cls.insert(df.to_records(index=False), skip_duplicates=True, ignore_extra_fields=True) + class PCGMeshworkMaker(m65mat.Meshwork.PCGMeshworkMaker): + @property + def key_source(self): + return ((Segment & Segment.Nucleus & 'segment_id!= 0') - Meshwork.PCGMeshworkMaker.proj() - Meshwork.PCGMeshworkExclude.proj()) * ImportMethod.PCGMeshwork.get_latest_entries() - # insert metadata - cls.Meta.insert1({'ver': client.materialize.version, 'description': description}) - print(f'Successfully inserted data for materialization version: {client.materialize.version:.2f}') + def make(self, key): + result = {**key, **ImportMethod.run(key)} + if result['meshwork_obj']: + result = {**{self.hash_name: self.hash1(result)}, **result} + self.master.insert1(result, ignore_extra_fields=True, skip_duplicates=True) + self.master.PCGMeshwork.insert1(result, ignore_extra_fields=True, skip_duplicates=True) + self.insert1(result, insert_to_master=True, skip_hashing=True, ignore_extra_fields=True, skip_duplicates=True, insert_to_master_kws={'ignore_extra_fields': True, 'skip_duplicates': True}) + else: + self.master.PCGMeshworkExclude.insert1({'segment_id': key['segment_id'], 'meshwork_id': 0, Exclusion.hash_name: Exclusion.hash1({'reason': 'no synapse data'}), 'ts_computed': str(datetime.now())}, ignore_extra_fields=True, skip_duplicates=True) -class AllenSomaCourseCellClassModelV1(m65mat.AllenSomaCourseCellClassModelV1): - - class Meta(m65mat.AllenSomaCourseCellClassModelV1.Meta): pass - - @classmethod - def fetch_df(cls, client): - print(f'Fetching data for materialization version {client.materialize.version:.2f}... ') +class Skeleton(m65mat.Skeleton): + + class PCGSkeleton(m65mat.Skeleton.PCGSkeleton): pass - # fetch dataframes - nuc_df = client.materialize.query_table('nucleus_detection_v0', split_positions=True, filter_out_dict={'pt_root_id': [0]}) - col_df = client.materialize.query_table('allen_soma_coarse_cell_class_model_v1', split_positions=True, filter_out_dict={'pt_root_id': [0]}) - merge_df = col_df.merge(nuc_df, on=['pt_root_id', 'valid']) - - # merge proofread and nucleus dataframes - rename_dict = { - 'id_y': 'nucleus_id', - 'pt_root_id': 'segment_id', - 'pt_position_x_y': 'centroid_x', - 'pt_position_y_y': 'centroid_y', - 'pt_position_z_y': 'centroid_z', - 'pt_supervoxel_id_y': 'supervoxel_id' - } - merge_df2 = merge_df.merge(merge_df.groupby(['pt_root_id'], as_index=False).nunique()[['pt_root_id', 'id_y']].rename(columns={'id_y': 'n_nuc'})) - final_df = merge_df2.rename(columns=rename_dict) - final_df['ver'] = client.materialize.version - final_df['valid'] = 1*(final_df.valid.values == 't') - - return final_df + class PCGSkeletonMaker(m65mat.Skeleton.PCGSkeletonMaker): + @property + def key_source(self): + return ((Segment & Segment.Nucleus & 'segment_id!= 0') - Skeleton.PCGSkeletonMaker.proj()) * ImportMethod.PCGSkeleton.get_latest_entries() + + def make(self, key): + result = {**key, **ImportMethod.run(key)} + result = {**{self.hash_name: self.hash1(result)}, **result} + self.master.insert1(result, ignore_extra_fields=True, skip_duplicates=True) + self.master.PCGSkeleton.insert1(result, ignore_extra_fields=True, skip_duplicates=True) + self.insert1(result, insert_to_master=True, skip_hashing=True, ignore_extra_fields=True, skip_duplicates=True, insert_to_master_kws={'ignore_extra_fields': True, 'skip_duplicates': True}) + + +class Queue(m65mat.Queue): + + class PCGMeshwork(m65mat.Queue.PCGMeshwork): pass - @classmethod - def fill(cls, client, description=None): - print('--> Filling AllenSomaCourseCellClassModelV1 table...') - - # fetch data from materialization service - df = cls.fetch_df(client) - - print('Inserting data...') - # insert info to main table - cls.insert(df.to_records(index=False), skip_duplicates=True, ignore_extra_fields=True) - - # insert metadata - cls.Meta.insert1({'ver': client.materialize.version, 'description': description}) - print(f'Successfully inserted data for materialization version: {client.materialize.version:.2f}') + class PCGSkeleton(m65mat.Queue.PCGSkeleton): pass -class AllenSomaCourseCellClassModelV2(m65mat.AllenSomaCourseCellClassModelV2): - - class Meta(m65mat.AllenSomaCourseCellClassModelV2.Meta): pass +def update_log_level(loglevel, update_root_level=True): + """ + Updates module and root logger. + + :param loglevel: (str) desired log level to overwrite default + :param update_root_level: (bool) updates root level with provided loglevel + """ + logger = djp.getLogger(__name__, level=loglevel, update_root_level=update_root_level) + logger.info(f'Logging level set to {loglevel}.') - @classmethod - def fetch_df(cls, client): - print(f'Fetching data for materialization version {client.materialize.version:.2f}... ') - - # fetch dataframes - nuc_df = client.materialize.query_table('nucleus_detection_v0', split_positions=True, filter_out_dict={'pt_root_id': [0]}) - col_df = client.materialize.query_table('allen_soma_coarse_cell_class_model_v2', split_positions=True, filter_out_dict={'pt_root_id': [0]}) - merge_df = col_df.merge(nuc_df, on=['pt_root_id', 'valid']) - - # merge proofread and nucleus dataframes - rename_dict = { - 'id_y': 'nucleus_id', - 'pt_root_id': 'segment_id', - 'pt_position_x_y': 'centroid_x', - 'pt_position_y_y': 'centroid_y', - 'pt_position_z_y': 'centroid_z', - 'pt_supervoxel_id_y': 'supervoxel_id' - } - merge_df2 = merge_df.merge(merge_df.groupby(['pt_root_id'], as_index=False).nunique()[['pt_root_id', 'id_y']].rename(columns={'id_y': 'n_nuc'})) - final_df = merge_df2.rename(columns=rename_dict) - final_df['ver'] = client.materialize.version - final_df['valid'] = 1*(final_df.valid.values == 't') - - return final_df + # update tables in module + # TODO - write this recursively + logger.info('Updating loglevel for all tables.') + for name, obj in inspect.getmembers(sys.modules[__name__]): + if name in ['key_source', '_master', 'master', 'UserTable']: + continue + if hasattr(obj, 'loglevel'): + obj.loglevel = loglevel + for partname, subobj in inspect.getmembers(obj): + if partname in ['key_source', '_master', 'master', 'UserTable']: + continue + if hasattr(subobj, 'loglevel'): + subobj.loglevel = loglevel + + +def download_materialization(ver=None, download_synapses=False, download_meshes=False, loglevel=None, update_root_level=True): + """ + Downloads materialization from CAVE. + + :param ver: (int) materialization version to download + If None, latest materialization is downloaded. + :param loglevel: (str) Optional, desired log level to overwrite default + :param update_root_level: (bool) updates root level with provided loglevel + """ + logger.info(f'Materialization download initialized.') - @classmethod - def fill(cls, client, description=None): - print('--> Filling AllenSomaCourseCellClassModelV2 table...') - - # fetch data from materialization service - df = cls.fetch_df(client) - - print('Inserting data...') - # insert info to main table - cls.insert(df.to_records(index=False), skip_duplicates=True, ignore_extra_fields=True) - - # insert metadata - cls.Meta.insert1({'ver': client.materialize.version, 'description': description}) - print(f'Successfully inserted data for materialization version: {client.materialize.version:.2f}') \ No newline at end of file + if loglevel is not None: + update_log_level(loglevel=loglevel, update_root_level=update_root_level) + + methods = [ + ImportMethod.MaterializationVer, + ImportMethod.NucleusSegment, + ] + + makers = [ + Materialization.CAVE, + Nucleus.CAVE, + Segment.Nucleus, + ] + + if download_synapses: + methods += [ImportMethod.Synapse2] + makers += [Synapse.CAVE] + + if download_meshes: + methods += [ImportMethod.MeshPartyMesh2] + makers += [Mesh.MeshParty] + + for m in methods: + logger.info(f'Updating methods for {m.class_name}.') + m.update_method(ver=ver) + + for mk in makers: + logger.info(f'Populating {mk.class_name}.') + mk.populate(m.master & m.get_latest_entries(), reserve_jobs=True, order='random', suppress_errors=True) + + +def download_meshwork_objects(restriction={}, loglevel=None, update_root_level=True): + """ + Downloads meshwork objects from cloud-volume. + + :param restriction: restriction to pass to populate + :param loglevel: (str) Optional, desired log level to overwrite default + :param update_root_level: (bool) updates root level with provided loglevel + """ + logger.info(f'Meshwork download initialized.') + + if loglevel is not None: + update_log_level(loglevel=loglevel, update_root_level=update_root_level) + + Meshwork.PCGMeshworkMaker.populate(restriction, reserve_jobs=True, order='random', suppress_errors=True) + + +def download_pcg_skeletons(restriction={}, loglevel=None, update_root_level=True): + """ + Downloads meshwork objects from cloud-volume. + + :param restriction: restriction to pass to populate + :param loglevel: (str) Optional, desired log level to overwrite default + :param update_root_level: (bool) updates root level with provided loglevel + """ + logger.info(f'Meshwork download initialized.') + + if loglevel is not None: + update_log_level(loglevel=loglevel, update_root_level=update_root_level) + + Skeleton.PCGSkeletonMaker.populate(restriction, reserve_jobs=True, order='random', suppress_errors=True) diff --git a/python/version.py b/python/version.py index 00ec2dc..9b36b86 100644 --- a/python/version.py +++ b/python/version.py @@ -1 +1 @@ -__version__ = "0.0.9" +__version__ = "0.0.10"