-
Notifications
You must be signed in to change notification settings - Fork 2
/
my_connector.py
40 lines (33 loc) · 1.44 KB
/
my_connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from multiverse_client_py import MultiverseClient, MultiverseMetaData, SocketAddress
class MyConnector(MultiverseClient):
def __init__(self, client_addr: SocketAddress, multiverse_meta_data: MultiverseMetaData) -> None:
super().__init__(client_addr, multiverse_meta_data)
def loginfo(self, message: str) -> None:
print(f"INFO: {message}")
def logwarn(self, message: str) -> None:
print(f"WARN: {message}")
def _run(self) -> None:
self.loginfo("Start running the client.")
self._connect_and_start()
def send_and_receive_meta_data(self) -> None:
self.loginfo("Sending request meta data: " + str(self.request_meta_data))
self._communicate(True)
self.loginfo("Received response meta data: " + str(self.response_meta_data))
def send_and_receive_data(self) -> None:
self.loginfo("Sending data: " + str(self.send_data))
self._communicate(False)
self.loginfo("Received data: " + str(self.receive_data))
if __name__ == "__main__":
multiverse_meta_data = MultiverseMetaData(
world_name="my_world",
length_unit="m",
angle_unit="rad",
mass_unit="kg",
time_unit="s",
handedness="rhs",
)
client_addr = SocketAddress(port="5000")
my_connector = MyConnector(client_addr=client_addr,
multiverse_meta_data=multiverse_meta_data)
my_connector.run()
my_connector.stop()