-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathclaralogConverter.py
149 lines (136 loc) · 6.99 KB
/
claralogConverter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# claralogConverter
#
# This little helper tool reads a text log file containing V2G messages (e.g. from https://github.com/uhi22/ccs32clara) and
# interprets the content of the EXI data.
#
# Preconditions:
# 1. You have a log file which contains V2G traffic (e.g. from https://openinverter.org/forum/viewtopic.php?p=64655#p64655)
# 2. You cloned and compiled the OpenV2Gx EXI decoder from https://github.com/uhi22/OpenV2Gx
#
# Limitations:
# - Only DIN is supported at the moment.
# - The script treats all V2G EXI messages as DIN messages. This means, the ApplHandshake messages at
# the begin of the charging session will lead to wrong or not decoded data.
# - The path where the script look for pcap files needs to be configured in the code.
#
# Possible improvements / Todos:
# - Show also the SLAC, NeigborDiscovery and SDP.
# - Add flexibility to also decode the ApplHandshake messages.
# - Add ISO support.
# - Configure the path where to look for pcap files via command line
#
import exiConnector
import os
from helpers import combineValueAndMultiplier
import json
# The path where the script will search for pcap files:
directory = '../clara-logs'
# stop the evaluation after this number of packets. Set to zero to have no limit.
nLimitNumberOfPackets = -1
# Examples of the decoder result:
#"EVSEPresentVoltage.Multiplier": "0",
#"EVSEPresentVoltage.Value": "318",
#"EVSEPresentVoltage.Unit": "V",
#"DC_EVStatus.EVRESSSOC": "53",
def convertClaralogToTxt(inputFileName):
global nLimitNumberOfPackets
global directory
outputFileName = inputFileName + '.decoded.txt'
# todo: if output file exists
if (os.path.isfile(os.path.join(directory, outputFileName))):
print("output file " + outputFileName + " already exists. Nothing to do.")
return
fileIn = open(inputFileName, 'r', encoding="Latin-1")
fileOut = open(outputFileName, 'w', encoding="Latin-1")
print("# generated by claralogConverter.py", file=fileOut)
print("# https://github.com/uhi22/pyPLC", file=fileOut)
fileOutValues = open(inputFileName + '.values.txt', 'w', encoding="Latin-1")
print("# generated by claralogConverter.py", file=fileOutValues)
print("# https://github.com/uhi22/pyPLC", file=fileOutValues)
fileOutStatistics = open(directory + '/pcap_statistics.txt', 'a')
print("# statistics for " + inputFileName, file=fileOutStatistics)
t1CableCheckBegin = 0
t2PreChargeBegin = 0
t3CurrentDemandBegin = 0
numberOfPackets=0
Lines = fileIn.readlines()
for line in Lines:
numberOfPackets+=1
#print(packet)
print(line.strip(), file=fileOut)
lineWithoutBlanks = line.replace(" ", "")
posTcpPayload = lineWithoutBlanks.find(":01fe800100")
if posTcpPayload>0:
# we found the V2GTP header.
tcppayload = lineWithoutBlanks[posTcpPayload + 1:] # everything until the end of the line is the data
# this gives a string of hex values, separated by " ", e.g. "01fe8001"
s = tcppayload
if (s[0:8]=="01fe8001"):
# it is a V2GTP header with EXI content
strExi = s[16:] # remove V2GTP header (8 bytes, means 16 hex characters)
pre = "DD" # decode DIN
decoded=exiConnector.exiDecode(strExi, pre)
#print(decoded)
print(decoded, file=fileOut)
#print(decoded)
strTimeStamp = line[line.find("[")+1 : line.find("]")]
#print(strTimeStamp)
try:
jsondict = json.loads(decoded)
try:
u = combineValueAndMultiplier(jsondict["EVSEPresentVoltage.Value"], jsondict["EVSEPresentVoltage.Multiplier"])
print("[" + strTimeStamp + "] EVSEPresentVoltage=" + str(u), file=fileOutValues)
i = combineValueAndMultiplier(jsondict["EVSEPresentCurrent.Value"], jsondict["EVSEPresentCurrent.Multiplier"])
print("[" + strTimeStamp + "] EVSEPresentCurrent=" + str(i), file=fileOutValues)
except:
pass
try:
u = combineValueAndMultiplier(jsondict["EVTargetVoltage.Value"], jsondict["EVTargetVoltage.Multiplier"])
print("[" + strTimeStamp + "] EVTargetVoltage=" + str(u), file=fileOutValues)
i = combineValueAndMultiplier(jsondict["EVTargetCurrent.Value"], jsondict["EVTargetCurrent.Multiplier"])
print("[" + strTimeStamp + "] EVTargetCurrent=" + str(i), file=fileOutValues)
except:
pass
try:
soc = jsondict["DC_EVStatus.EVRESSSOC"]
print("[" + strTimeStamp + "] EVRESSSOC=" + str(soc), file=fileOutValues)
except:
pass
try:
soc = jsondict["DC_EVSEStatus.EVSEStatusCode"]
print("[" + strTimeStamp + "] EVSEStatusCode=" + str(soc), file=fileOutValues)
except:
pass
except:
pass
if ((numberOfPackets % 100)==0):
print(str(numberOfPackets) + " packets")
if ((nLimitNumberOfPackets>0) and (numberOfPackets>=nLimitNumberOfPackets)):
break
# Statistics of the timing:
#print("t1CableCheckBegin " + str(t1CableCheckBegin))
#print("t2PreChargeBegin " + str(t2PreChargeBegin))
#print("t3CurrentDemandBegin " + str(t3CurrentDemandBegin))
if ((t1CableCheckBegin>0) and (t2PreChargeBegin>t1CableCheckBegin) and (t3CurrentDemandBegin>t2PreChargeBegin)):
print("charger MAC " + chargerMAC + " " + getManufacturerFromMAC(chargerMAC))
timeForCableCheck = t2PreChargeBegin - t1CableCheckBegin
timeForPreCharge = t3CurrentDemandBegin - t2PreChargeBegin
print("timeForCableCheck= " + ("%.3f" % timeForCableCheck))
print("timeForPreCharge= " + ("%.3f" % timeForPreCharge))
print(chargerMAC + ";" + getManufacturerFromMAC(chargerMAC) + ";" + \
"timeForCableCheck;" + ("%.3f" % timeForCableCheck) + ";" + \
"timeForPreCharge; " + ("%.3f" % timeForPreCharge), file=fileOutStatistics)
fileOutStatistics.close()
fileOut.close()
fileOutValues.close()
fileIn.close()
# iterate over files in the directory
for filename in os.listdir(directory):
strFileNameWithPath = os.path.join(directory, filename)
# checking if it is a file
if os.path.isfile(strFileNameWithPath):
print(strFileNameWithPath)
# check the file extension:
if (strFileNameWithPath[-9:]==".claralog"):
print("Will decode " + strFileNameWithPath)
convertClaralogToTxt(strFileNameWithPath)