-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
88 lines (72 loc) · 2.28 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import sys
import time
import os
import logging
from watchdog.observers import Observer
from watchdog.events import RegexMatchingEventHandler
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.FileHandler('exp-watcher.log')
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s: %(name)s ~ %(levelname)s: %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
class ExposureWatcher:
def __init__(self, src_path):
self.__src_path = src_path
self.__event_handler = ExposureHandler()
self.__event_observer = Observer()
def run(self):
self.start()
try:
while True:
global stop_thread
if stop_thread:
break
time.sleep(1)
except Exception:
logger.exception("File watcher interrupted")
finally:
self.stop()
def start(self):
self.__schedule()
self.__event_observer.start()
logger.info('File watcher started!')
def stop(self):
self.__event_observer.stop()
self.__event_observer.join()
logger.info('File watcher stoped!')
def __schedule(self):
self.__event_observer.schedule(
self.__event_handler,
self.__src_path,
recursive=True
)
class ExposureHandler(RegexMatchingEventHandler):
# FILE_REGEX = [r".*\.fits$"]
FILE_REGEX = [r".*"]
def __init__(self):
super().__init__(self.FILE_REGEX)
def on_created(self, event):
file_size = -1
while file_size != os.path.getsize(event.src_path):
file_size = os.path.getsize(event.src_path)
time.sleep(2)
self.process(event)
def process(self, event):
filename = os.path.basename(event.src_path)
logger.info("Welcome {} exposure! :)".format(filename))
if __name__ == "__main__":
import threading
src_path = sys.argv[1] if len(sys.argv) > 1 else '.'
exp = ExposureWatcher(src_path)
stop_thread = False
t1 = threading.Thread(target=exp.run)
t1.start()
# simulate exposures arrival: it is time to copy the test
# exposures in the 'src_path' directory.
time.sleep(5)
stop_thread = True
print('Bye')