-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkvaser.py
70 lines (60 loc) · 2.17 KB
/
kvaser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import sink
import sys
import struct
from ctypes import *
# Errors
class KvaserError(Exception):
pass
class Kvaser(object):
def __init__(self):
if sys.platform.startswith('win'):
self.dll = WinDLL('canlib32')
self.dll.canInitializeLibrary()
else:
self.dll = CDLL('libcanlib.so')
# Open CAN channel.
self.handle = self.dll.canOpenChannel(c_int(0), c_int(0))
res = self.dll.canBusOn(c_int(self.handle))
if res < 0:
raise KvaserError('Error initializing, {}'.format(res))
def reset(self):
pass
def _transmit(self, address, data):
ctypes_data = (c_ubyte * 8).from_buffer_copy(data)
res = self.dll.canWrite(
c_int(self.handle),
c_int(address),
pointer(ctypes_data),
c_int(8),
c_int(0))
def sink(self, telemetry):
msg = struct.pack('hhhh',
telemetry[sink.TYPE_RPM],
telemetry[sink.TYPE_THROTTLE] / 0.001,
telemetry[sink.TYPE_MANIFOLD_PRESSURE] / 0.1,
0)
self._transmit(0x520, msg)
msg = struct.pack('hhhh',
0,
0,
0,
telemetry[sink.TYPE_SPEED] / 0.1)
self._transmit(0x522, msg)
msg = struct.pack('hhhh',
0,
0,
0,
telemetry[sink.TYPE_BRAKE] / 0.0001)
self._transmit(0x524, msg)
msg = struct.pack('hhhh',
telemetry[sink.TYPE_VOLTAGE] / 0.01,
telemetry[sink.TYPE_OIL_PRESSURE] / 0.1,
telemetry[sink.TYPE_OIL_TEMP] / 0.1,
telemetry[sink.TYPE_WATER_TEMP] / 0.1)
self._transmit(0x530, msg)
msg = struct.pack('hhhh',
0,
telemetry[sink.TYPE_LONG_ACCEL] / 0.1,
telemetry[sink.TYPE_LAT_ACCEL] / 0.1,
0)
self._transmit(0x531, msg)