-
Notifications
You must be signed in to change notification settings - Fork 3
/
module.py
72 lines (53 loc) · 2.23 KB
/
module.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class Module (object):
def __init__ (self, adapter, regs):
self.adapter, self.regs = adapter, regs
self.poll_cache = None
self.input_num = adapter.hw.read_sr (regs.input_bit_size)
if self.input_num:
self.input_start = adapter.hw.read_sr (regs.input_bit_start)
self.output_num = adapter.hw.read_sr (regs.output_bit_size)
if self.output_num:
self.output_start = adapter.hw.read_sr (regs.output_bit_start)
def __str__ (self):
return self.adapter.hw.read_string (self.regs.product_name, 73).decode("utf-8")
def __repr__ (self):
return str(self)
def __getitem__ (self, bit):
return self.get (bit)
def __setitem__ (self, bit, high):
return self.set (bit, high)
def _ensure_valid_bit (fn):
def __ensure_valid_bit (self, bit, *args, **kwargs):
if bit < 0 or bit >= (self.input_num + self.output_num):
raise ValueError ("Bit out of range")
return fn (self, bit, *args, **kwargs)
return __ensure_valid_bit
@_ensure_valid_bit
def is_output (self, bit):
return bit >= self.input_num
@_ensure_valid_bit
def get (self, bit):
if self.is_output (bit):
return self.adapter.hw.read_coils (self.output_start + bit).bits[0]
else:
return self.adapter.hw.read_discrete_inputs (self.input_start + bit).bits[0]
@_ensure_valid_bit
def set (self, bit, high=True):
if not self.is_output (bit):
raise ValueError ("Cannot set an input bit")
self.adapter.hw.write_coil (self.output_start + bit, high)
def get_all (self):
i, o = [], []
if self.input_num:
i = self.adapter.hw.read_discrete_inputs (self.input_start, self.input_num).bits
if self.output_num:
o = self.adapter.hw.read_coils (self.output_start, self.output_num).bits
return i + o
def poll_prepare (self):
self.poll_cache = self.get_all ()
return self.poll_cache
def poll (self):
state = self.get_all ()
ret = [(i, s) for (i, (s, c)) in enumerate (zip (state, self.poll_cache)) if s != c]
self.poll_cache = state
return ret