-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
132 lines (100 loc) · 3.65 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import sys
from datetime import datetime
from .events import Events, TimedEvents
from . import command
class App(object):
def __init__(self):
self.events = Events()
self.timed_events = TimedEvents()
def run(self, debug=False):
for event in self.events + self.timed_events:
command.command("LADDERLOG_WRITE_%s 1" % event.trigger)
self.running = True
try:
while self.running:
s = sys.stdin.readline()
if not s:
break
self._line = s.strip().split()
trigger, args = self._line[0], self._line[1:]
events = self.events.get(trigger)
if events:
for event in events:
params = self.parse_callback_params(event, args)
event.callback(**params)
timed_events = self.timed_events.get(trigger)
if timed_events:
for t in timed_events:
t.restart()
if debug:
self.print_debug_info()
sys.stdout.flush()
self.before_exit()
except KeyboardInterrupt:
self.before_exit()
except TypeError as e:
command.comment("TypeError: %s " % e)
def parse_callback_params(self, event, args):
params = dict()
i = 0
for param, value in zip(event.params, args):
param = param.strip('<>')
parts = param.split(':')
if len(parts) > 1:
eval(parts[0])
if param.endswith('...'):
params[param.strip('...')] = ' '.join(args[i:])
break
"""
Maybe evaluate as python in the future.
if ':' in param:
parts = param.split(':', 2)
fn, param = parts[0], param[1]
eval(fn)(param)
"""
if param.startswith('int'):
param = param.split(':')[1]
value = parse_int(value)
if param.startswith('float'):
param = param.split(':')[1]
value = parse_float(value)
params[param] = value
i += 1
return params
def get_line(self):
return self._line
def print_debug_info(self):
"""Prints the attributes of this object to the console"""
command.command("# %s" % datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
for key, value in self.__dict__.items():
command.comment("%s: %s" % (key, value))
def stop(self):
self.running = False
def event(self, e):
"""Decorator for adding LadderLog events to the App."""
def decorator(callback, *args, **kwargs):
args = e.split()
trigger, params = args[0], args[1:]
self.events.add(trigger, callback, params)
return callback
return decorator
def timed_event(self, e, seconds, periodic=False, name=None):
"""Decorator for adding timed events to the App."""
def decorator(callback):
self.timed_events.add(e, seconds, callback, periodic, name)
return callback
return decorator
def before_exit(self):
pass
def parse_int(string):
"""Returns 0 if string can't be converted to int."""
try:
return int(string)
except ValueError:
return 0
def parse_float(string):
"""Returns 0 if string can't be converted to float."""
try:
return float(string)
except ValueError:
return 0