-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathpymodbus_test.py
119 lines (103 loc) · 2.86 KB
/
pymodbus_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# %%
from pymodbus.client import ModbusTcpClient
import yaml
# %%
# configuration
CFG = {
"Waveshare": {
"IP": "192.168.4.40",
"PORT": 502,
},
"S2-WL": {
"IP": "192.168.4.240",
"PORT": 502,
},
"DLS-L": {
"IP": "192.168.4.79",
"PORT": 8899,
},
} # Update with your inverter IP
devices = [
# 'S2-WL',
"Waveshare",
]
input_registers = [
{
"desc": "Inverter Temperature",
"addr": 33093,
"len": 1,
"scale": 0.1,
"uom": "C",
},
{
"desc": "Grid Voltage",
"addr": 33251,
"len": 1,
"scale": 0.1,
"uom": "V",
},
{
"desc": "Battery SOC",
"addr": 33139,
"len": 1,
"scale": 1,
"uom": "%",
},
{
"desc": "Battery Storage Mode (R)",
"addr": 33132,
"len": 1,
"scale": 1,
"uom": "",
},
]
# %%
def secret_constructor(loader, node):
# Process the node and return the corresponding Python object
return None
yaml.add_constructor("!secret", secret_constructor)
with open("solis.yaml", "r") as file:
data = yaml.load(file, Loader=yaml.FullLoader)
sensors = data[0]["sensors"]
# %%
# Enable values to be signed (default is False).
sock = {}
for device in devices:
client = ModbusTcpClient(host=CFG[device]["IP"], port=CFG[device]["PORT"])
if client.connect():
str = f"Device {device} connected at {CFG[device]['IP']}:{CFG[device]['PORT']}"
print(str)
print("-" * len(str) + "\n")
for j, sensor in enumerate(sensors[:8]):
# fmt=f"8.{max(int(-np.log10(sensor['scale'])),0)}f"
if sensor["input_type"] == "input":
result = client.read_input_registers(
sensor["address"], slave=sensor["slave"]
)
elif sensor["input_type"] == "holding":
result = client.read_holding_registers(
sensor["address"], slave=sensor["slave"]
)
for i, register in enumerate(result.registers):
print(
f"{j:>3d} {(sensor['name']+f' [Byte {i}]:'):60s}{register*sensor['scale']:6.1f} {sensor.get('unit_of_measurement')}"
)
print("\n\n")
else:
print(
f"Failed to connect to Device {device} at {device['IP']}:{device['PORT']}"
)
client.close()
# %%
device = 'Waveshare'
client = ModbusTcpClient(host=CFG[device]["IP"], port=CFG[device]["PORT"])
print(client.connect())
# %%
result = client.read_input_registers(33022, count=6, slave=1)
print(result.registers)
client = ModbusTcpClient(host=CFG[device]["IP"], port=CFG[device]["PORT"])
result = client.read_holding_registers(43000, count=6, slave=1)
print(result.registers)
# %%
result = client.read_holding_registers(43151, count=40, slave=1)
# %%