-
Notifications
You must be signed in to change notification settings - Fork 0
/
lc.py
101 lines (82 loc) · 3.03 KB
/
lc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from collections import defaultdict
import os
import itertools
import reprlib
def lc_reader(filename):
lclist=[]
meta_data = []
with open(filename) as fp:
for line in fp:
if line.find('#')!=0:
lclist.append([float(f) for f in line.strip().split()])
else:
#read and store the sentences begin with '#'
temp = [f for f in line.strip().split()]
temp[0]=temp[0][1:]
meta_data.append(temp)
#parse the first two meaningful lines, facet names and facet values
facets_dict = dict(zip(meta_data[0], meta_data[1]))
return lclist, facets_dict
class LightCurve:
def __init__(self, data, metadict):
self._time = [x[0] for x in data]
self._amplitude = [x[1] for x in data]
self._error = [x[2] for x in data]
self.metadata = metadict
self.filename = None
#add _timeseries
self._timeseries = list(zip(self._time, self._amplitude))
@classmethod
def from_file(cls, filename):
lclist, metadict = lc_reader(filename)
instance = cls(lclist, metadict)
instance.filename = filename
return instance
def __repr__(self):
class_name = type(self).__name__
components = reprlib.repr(list(itertools.islice(self.timeseries,0,10)))
components = components[components.find('['):]
return '{}({})'.format(class_name, components)
#add sequence protocol
def __getitem__(self, pos):
return self._timeseries[pos]
def __len__(self):
return len(self._timeseries)
@property
def time(self):
return self.__time
@property
def amplitude(self):
return self.__amplitude
# already add _timeseries in __init__
@property
def timeseries(self):
return self._timeseries
class LightCurveDB:
def __init__(self):
self._collection={}
self._field_index=defaultdict(list)
self._tile_index=defaultdict(list)
self._color_index=defaultdict(list)
def populate(self, folder):
for root,dirs,files in os.walk(folder): # DEPTH 1 ONLY
for file in files:
if file.find('.mjd')!=-1:
the_path = root+"/"+file
self._collection[file] = LightCurve.from_file(the_path)
def index(self):
for f in self._collection:
lc, tilestring, seq, color, _ = f.split('.')
field = int(lc.split('_')[-1])
tile = int(tilestring)
self._field_index[field].append(self._collection[f])
self._tile_index[tile].append(self._collection[f])
self._color_index[color].append(self._collection[f])
#your code here
def retrieve(self, facet, value):
if facet == 'tile':
return self._tile_index[value]
elif facet == 'field':
return self._field_index[value]
elif facet == 'color':
return self._color_index[value]