-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmastervenus.py
executable file
·247 lines (209 loc) · 10.6 KB
/
mastervenus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib
import dbus
import dbus.service
import inspect
import pprint
import os
import sys
import time
from threading import Thread
import can
import struct
# our own packages
sys.path.insert(1, os.path.join(os.path.dirname(__file__), '../'))
from vedbus import VeDbusService
def readVersionFile():
versionStr=open(os.path.dirname(__file__)+'/version').read()
return versionStr.strip()
def createDBusEntriesForDCShunt(deviceinstance):
dcshunt_dbusservice = VeDbusService('com.victronenergy.battery.masterbus_shunt0', dbus.SystemBus(private=True))
dcshunt_dbusservice.add_path('/Mgmt/ProcessName', __file__)
dcshunt_dbusservice.add_path('/Mgmt/ProcessVersion', readVersionFile())
dcshunt_dbusservice.add_path('/Mgmt/Connection', 'Mastervenus')
# Create the mandatory objects
dcshunt_dbusservice.add_path('/DeviceInstance', deviceinstance)
dcshunt_dbusservice.add_path('/ProductName', 'Mastervolt Shunt')
dcshunt_dbusservice.add_path('/CustomName', 'DCShunt')
dcshunt_dbusservice.add_path('/FirmwareVersion', readVersionFile())
dcshunt_dbusservice.add_path('/ProductId', 20)
dcshunt_dbusservice.add_path('/HardwareVersion', 1)
dcshunt_dbusservice.add_path('/Connected', 1)
dcshunt_dbusservice.add_path('/Serial', 'Serial123')
# As defined in https://github.com/victronenergy/venus/wiki/dbus#battery
dcshunt_dbusservice.add_path('/Dc/0/Voltage', value=None)
dcshunt_dbusservice.add_path('/Dc/0/Current', value=None)
dcshunt_dbusservice.add_path('/Dc/0/Power', value=None)
dcshunt_dbusservice.add_path('/Dc/0/Temperature', value=None)
dcshunt_dbusservice.add_path('/ConsumedAmphours', value=None)
dcshunt_dbusservice.add_path('/Soc', value=None)
dcshunt_dbusservice.add_path('/TimeToGo', value=None) #Time of discharge remaining in seconds.
dcshunt_dbusservice.add_path('/History/MinimumVoltage', value=None)
dcshunt_dbusservice.add_path('/History/MaximumVoltage', value=None)
dcshunt_dbusservice.add_path('/History/LastDischarge', value=None)
dcshunt_dbusservice.add_path('/History/DeepestDischarge', value=None)
return dcshunt_dbusservice
DEVICE_ID_MASK=0x0003FFFF
DEVICE_ID_DC_SHUNT=0x31297
DEVICE_ID_MASSCOMBI=0x2f412
DEVICE_ID_MASTERVIEW=0x0840a
MESSAGE_KIND_MASK=0x1FFC0000
ATTR_DCSHUNT_SOC=0x00
ATTR_DCSHUNT_VOLTS=0x01
ATTR_DCSHUNT_AMPS=0x02
ATTR_DCSHUNT_AMPS_CONSUMED=0x03
ATTR_DCSHUNT_TEMPERATURE=0x05
ATTR_INVERTER_STATE=0x020e
ATTR_INVERTER_ON_OR_OFF=0x0014
ATTR_INVERTER_DC_VOLTAGE_IN=0x06
ATTR_INVERTER_DC_AMPS_IN=0x07
ATTR_CHARGER_AC_VOLTAGE_IN=0x08
ATTR_CHARGER_AC_AMPS_IN=0x09
ATTR_INVERTER_AC_VOLTS_OUT=0x0A
ATTR_INVERTER_AC_AMPS_OUT=0x0B
ATTR_INVERTER_INPUT_GENSET=0x0E
XXX_ATTR_INVERTER_STATE=0x10
ATTR_INVERTER_LOAD_PERCENT=0x11
ATTR_CHARGER_STATE=0x12
ATTR_INVERTER_SHORE_FUSE=0x13
ATTR_INVERTER_ON_OFF=0x14
ATTR_INVERTER_ENTER_OFF=0x15
ATTR_MASSCOMBI_POWER_STATE=0x38
ATTR_CHARGER_SWITCH_STATE=0x3A
ATTR_CHARGER_MODE=0x3C
def recomputeTimeToGo():
if (dcshunt_dbusservice['/Dc/0/Current'] is None or dcshunt_dbusservice['/Soc'] is None or dcshunt_dbusservice['/ConsumedAmphours'] is None):
return
if (dcshunt_dbusservice['/Dc/0/Current'] >= 0 or dcshunt_dbusservice['/Soc']>=100):
dcshunt_dbusservice['/TimeToGo']=None
else:
stateOfDischarge=(100-dcshunt_dbusservice['/Soc'])/100.0
totalAmpHours=-dcshunt_dbusservice['/ConsumedAmphours']/stateOfDischarge
hoursLeft=totalAmpHours/dcshunt_dbusservice['/Dc/0/Current']
dcshunt_dbusservice['/TimeToGo']=hoursLeft*3600
def handleDCShuntMessage(message, messageKind):
if(message.dlc == 2): return
if 'dcshunt_dbusservice' not in globals() : return
# print('\tmessageKind=%X' % (messageKind))
if(0x21b==messageKind):
global dcshunt_dbusservice
(attributeId, floatValue)=struct.unpack('<Hf', message.data)
print('\tdcshunt (attributeId=0x%X, floatValue=%f)' % (attributeId, floatValue))
if(ATTR_DCSHUNT_SOC==attributeId):
dcshunt_dbusservice['/Soc']=round(floatValue, 0)
elif(ATTR_DCSHUNT_VOLTS==attributeId):
dcshunt_dbusservice['/Dc/0/Voltage']=round(floatValue, 2)
if dcshunt_dbusservice['/Dc/0/Current'] is not None : dcshunt_dbusservice['/Dc/0/Power']=round(floatValue*dcshunt_dbusservice['/Dc/0/Current'])
if dcshunt_dbusservice['/History/MinimumVoltage'] is None or dcshunt_dbusservice['/History/MinimumVoltage'] > floatValue:
dcshunt_dbusservice['/History/MinimumVoltage']=round(floatValue, 2)
if dcshunt_dbusservice['/History/MaximumVoltage'] is None or dcshunt_dbusservice['/History/MaximumVoltage'] < floatValue:
dcshunt_dbusservice['/History/MaximumVoltage']=round(floatValue, 2)
elif(ATTR_DCSHUNT_AMPS==attributeId):
dcshunt_dbusservice['/Dc/0/Current']=round(floatValue, 2)
if dcshunt_dbusservice['/Dc/0/Voltage'] is not None : dcshunt_dbusservice['/Dc/0/Power']=round(floatValue*dcshunt_dbusservice['/Dc/0/Voltage'])
elif(ATTR_DCSHUNT_AMPS_CONSUMED==attributeId):
dcshunt_dbusservice['/ConsumedAmphours']=round(floatValue, 2)
if dcshunt_dbusservice['/History/LastDischarge'] is None or dcshunt_dbusservice['/History/LastDischarge'] < floatValue:
dcshunt_dbusservice['/History/LastDischarge']=round(floatValue, 2)
recomputeTimeToGo()
elif(ATTR_DCSHUNT_TEMPERATURE==attributeId):
dcshunt_dbusservice['/Dc/0/Temperature']=round(floatValue, 1)
elif(0x19b==messageKind): #String label messages
None
else:
if(message.dlc>4):
print('No dcshunt handling for message kind 0x%X' % (messageKind))
print(message)
def createDBusEntriesForMassCombi(deviceinstance):
masscombi_dbusservice = VeDbusService('com.victronenergy.inverter.masterbus_masscombi1', dbus.SystemBus(private=True))
masscombi_dbusservice.add_path('/Mgmt/ProcessName', __file__)
masscombi_dbusservice.add_path('/Mgmt/ProcessVersion', readVersionFile())
masscombi_dbusservice.add_path('/Mgmt/Connection', 'Mastervenus')
# Create the mandatory objects
masscombi_dbusservice.add_path('/DeviceInstance', deviceinstance)
masscombi_dbusservice.add_path('/ProductId', 0)
masscombi_dbusservice.add_path('/ProductName', 'Mastervolt masscombi 4000W')
masscombi_dbusservice.add_path('/CustomName', 'Inverter Charger') #Get this from configuration
masscombi_dbusservice.add_path('/FirmwareVersion', 0) #Get this from configuration
masscombi_dbusservice.add_path('/HardwareVersion', 0) #Get this from configuration
masscombi_dbusservice.add_path('/Serial', 'MassCombiSerial') #Get this from configuration
masscombi_dbusservice.add_path('/Connected', 1)
# As defined in https://github.com/victronenergy/venus/wiki/dbus#inverter
masscombi_dbusservice.add_path('/Dc/0/Voltage', value=None)
#masscombi_dbusservice.add_path('/Dc/0/Current', value=None)
masscombi_dbusservice.add_path('/Ac/Out/L1/V', value=120)
masscombi_dbusservice.add_path('/Ac/Out/L1/I', value=None)
masscombi_dbusservice.add_path('/Mode', value=2)
masscombi_dbusservice.add_path('/State', value=9)
return masscombi_dbusservice
def handleMasscombiMessage(message, messageKind):
global masscombi_dbusservice
if(0x020e==messageKind):
(attributeId, floatValue)=struct.unpack('<Hf', message.data)
print('\tmasscombi (attributeId=0x%X, floatValue=%f)' % (attributeId, floatValue))
if(ATTR_INVERTER_STATE==attributeId):
masscombi_dbusservice['/Mode']=2 #TODO Switch position: 2=Inverter on; 4=Off; 5=Low Power/ECO
masscombi_dbusservice['/State']=9 #TODO 0=Off; 1=Low Power; 2=Fault; 9=Inverting
elif(ATTR_INVERTER_AC_AMPS_OUT==attributeId):
masscombi_dbusservice['/Ac/Out/L1/I']=round(floatValue, 2)
elif(ATTR_INVERTER_DC_VOLTAGE_IN==attributeId):
masscombi_dbusservice['/Dc/0/Voltage']=round(floatValue, 2)
#elif(ATTR_INVERTER_DC_AMPS_IN==attributeId):
#masscombi_dbusservice['/Dc/0/Current']=round(floatValue, 2)
def parseMasterbusMessage(message):
#if(message.is_rx==False):
#return
deviceId=message.arbitration_id&DEVICE_ID_MASK
messageKind=(message.arbitration_id&MESSAGE_KIND_MASK)>>18;
if(deviceId==DEVICE_ID_DC_SHUNT):
handleDCShuntMessage(message, messageKind);
elif(deviceId==DEVICE_ID_MASSCOMBI):
handleMasscombiMessage(message, messageKind);
def handleDBusEvents(mainloop):
try:
bus = can.Bus(channel='can1', interface='socketcan')
periodicMessages=[
makeMastervoltRequestMessage(0x186F1297, ATTR_DCSHUNT_VOLTS),
makeMastervoltRequestMessage(0x186F1297, ATTR_DCSHUNT_AMPS),
]
bus.send_periodic(periodicMessages, 1.0)
periodicMessages=[
makeMastervoltRequestMessage(0x186F1297, ATTR_DCSHUNT_SOC),
makeMastervoltRequestMessage(0x186F1297, ATTR_DCSHUNT_AMPS_CONSUMED),
makeMastervoltRequestMessage(0x186F1297, ATTR_DCSHUNT_TEMPERATURE),
]
bus.send_periodic(periodicMessages, 10.0)
periodicMessages=[
makeMastervoltRequestMessage(0x183AF412, ATTR_INVERTER_DC_VOLTAGE_IN),
makeMastervoltRequestMessage(0x183AF412, ATTR_INVERTER_AC_AMPS_OUT),
]
bus.send_periodic(periodicMessages, 1.0)
for message in bus:
parseMasterbusMessage(message)
except can.exceptions.CanOperationError:
print("Got CanOperationError. Exit and exit")
#sys.exit(1)
os._exit(1)
#mainloop.quit()
def makeMastervoltRequestMessage(deviceId,itemId):
return can.Message(arbitration_id=deviceId, data=itemId.to_bytes(2,'little'), is_extended_id=True)
def mainForwardMasterbusToDbus():
global dbusObjects
print(__file__ + " starting up")
print(readVersionFile())
# Have a mainloop, so we can send/receive asynchronous calls to and from dbus
DBusGMainLoop(set_as_default=True)
global dcshunt_dbusservice
dcshunt_dbusservice=createDBusEntriesForDCShunt(deviceinstance=10)
global masscombi_dbusservice
masscombi_dbusservice=createDBusEntriesForMassCombi(deviceinstance=11)
mainloop = GLib.MainLoop()
poller = Thread(target=lambda: handleDBusEvents(mainloop))
poller.daemon = True
poller.start()
mainloop.run()
mainForwardMasterbusToDbus()
#Without a terminator, I get 2000 canbus messages in 81 seconds. ---> time candump -n 2000 can1
#If I add a 120 Ohm terminator between Can H and CAN L.