-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatasets.py
126 lines (110 loc) · 4.25 KB
/
datasets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Jan 14 13:35:10 2024
@author: georgioskeliris
"""
import pandas as pd
import numpy as np
import suite2p
from ScanImageTiffReader import ScanImageTiffReader
defaultCsvPath = '/mnt/NAS_UserStorage/georgioskeliris/MECP2TUN/MECP2_datasets.csv'
def datasetQuery(csvFilePath=defaultCsvPath,
cohort=[], week=[], mouseID=[], ses=None, experiment=None):
# read the datasets.csvs
df = pd.read_csv(csvFilePath)
if type(cohort) == str:
cohort = [cohort]
if type(week) == str:
week = [week]
if type(mouseID) == str:
mouseID = [mouseID]
if cohort == []:
cohort = list(df["cohort"].unique())
if week == []:
week = list(df["week"].unique())
if mouseID == []:
mouseID = list(df["mouseID"].unique())
if experiment is None:
exps = list(df["expID"].unique())
else:
# account for possibility for exp2, exp3, exp4
exps = [(experiment + str(x)) for x in range(2,6)]
exps.append(experiment)
ds = df[(df["cohort"].isin(cohort)) & (df["week"].isin(week)) &
(df["mouseID"].isin(mouseID)) & (df["expID"].isin(exps))]
datIDs=[]
if ses is not None:
if len(ses)>4:
ds = ds[(ds["session"]==ses)]
else:
for d in range(0,len(ds)):
if ds["session"].iloc[d][0:4]==ses:
datIDs.append(ds["datID"].iloc[d])
ds = ds[(ds["datID"].isin(datIDs))]
return ds
def getDataPaths(cohort, week, experiment):
df = pd.read_csv('/mnt/Toshiba_16TB_1/MECP2_datasets.csv')
# account for possibility for exp2, exp3, exp4
exps = [(experiment + str(x)) for x in range(2,6)]
exps.append(experiment)
ds = df[(df["cohort"] == cohort) & (df["week"]== week) & (df["expID"].isin(exps))]
return ds
def getOneExpPath(cohort, week, mouseID, experiment):
df = pd.read_csv('/mnt/Toshiba_16TB_1/MECP2_datasets.csv')
# account for possibility for exp2, exp3, exp4
exps = [(experiment + str(x)) for x in range(2,6)]
exps.append(experiment)
ds = df[(df["cohort"] == cohort) & (df["week"]== week) & (df["mouseID"]==mouseID) & (df["expID"].isin(exps))]
return ds
def getOneSesPath(cohort, week, mouseID, sesX, experiment):
ds = getOneExpPath(cohort, week, mouseID, experiment)
# account for possibility for exp2, exp3, exp4
exps = [(experiment + str(x)) for x in range(2,6)]
exps.append(experiment)
for d in range(0,len(ds)):
if ds["session"].iloc[d][0:4]==sesX:
ds=ds[(ds["session"] == ds["session"].iloc[d])]
break
return ds
def checkDatasets(ds_path):
fs, dif = suite2p.io.utils.list_files(ds_path,False,["*.tif","*.tiff"])
print('\n\nCHECKING DATA INTEGRITY:')
out = {'PASS':[],'PASS_ind':[],'FAIL':[],'FAIL_ind':[],'EXCEPTION':[]}
for t in range(0,len(fs)):
try:
ScanImageTiffReader(fs[t])
print(fs[t][-8:],'\t-> OK')
out['PASS'].append(fs[t][-8:])
out['PASS_ind'].append(t)
except Exception as error:
print(fs[t][-8:],'\t-> CORRUPTED')
out['FAIL'].append(fs[t][-8:])
out['FAIL_ind'].append(t)
out['EXCEPTION'].append(error)
# handle the exception
print("\t\t\tAn exception occurred:", type(error).__name__, "-", error)
return out
def checkTifFile(ds_path, fileIndex):
fs, dif = suite2p.io.utils.list_files(ds_path,False,["*.tif","*.tiff"])
print('\n\nCHECKING DATA INTEGRITY:')
try:
ScanImageTiffReader(fs[fileIndex])
print(fs[fileIndex][-8:],'\t-> OK')
out=True
except Exception as error:
print(fs[fileIndex][-8:],'\t-> CORRUPTED')
out=False
return out
def getTimeStamps(ds_path, fileIndex):
fs, dif = suite2p.io.utils.list_files(ds_path,False,["*.tif","*.tiff"])
reader = ScanImageTiffReader(fs[fileIndex])
stack = reader.data()
timestamps=np.empty(stack.shape[0])
for fr in range(stack.shape[0]):
des = reader.description(fr)
desList = des.splitlines()
tmp = desList[3]
tmp2 = tmp.split(" = ")
timestamps[fr] = float(tmp2[1])
return timestamps