This repository has been archived by the owner on Apr 15, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservo_test.py
120 lines (95 loc) · 3.68 KB
/
servo_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#! /usr/bin/python3
import socket
from os import system
from proto.pb_pb2 import *
import binascii
import asyncio
from time import sleep
from collections import namedtuple
SERVOS_PORT_BASE = 32000 # base port
SERVOS_ID_BASE = 0x010 # base CAN ID
TX_ID_OFFSET = 0x400 # Offset to board ID for TX ID (most significant bit at 1)
MSG_SERVO = BusMessage(servo=ServoMsg())
MSG_PUMP = BusMessage(pumpAndValve=PumpAndValveMsg())
ORDER_SERVO = 0
ORDER_PUMP = 1
# Big servo for high/low positions
SERVO_ANGLES_BIG_LOW = [40, 10, 30, 30, 30] # left r right
SERVO_ANGLES_BIG_HIGH = [180, 150, 160, 160, 160] # left r right
# Middle knee servo for high/lower position
SERVO_ANGLES_MID_LOW = [0, 0, 0, 0, 0] # left r right
SERVO_ANGLES_MID_HIGH = [120, 120, 120, 120, 120] # left r right
# gripper open/close
SERVO_ANGLES_OPEN = [180, 180, 180, 180, 180]
SERVO_ANGLES_CLOSE = [20, 20, 20, 20, 20]
def send_order(sock, type, id, value):
msg = None
if type == ORDER_SERVO :
print("Servo order : ", id, value)
msg = MSG_SERVO
msg.servo.id = id
msg.servo.angle = value
elif type == ORDER_PUMP:
print("Pump order: ", id, value)
msg = MSG_PUMP
msg.pumpAndValve.id = id
print(value)
if(value == 1):
msg.pumpAndValve.on = True
elif(value == 0):
msg.pumpAndValve.on = False
else:
print("Error : For a pump board, the value to send is 1 or 0 only")
return
else:
print("Error : type of order should be ORDER_SERVO or ORDER_PUMP")
return
data = msg.SerializeToString()
data_sent = b'<' + binascii.hexlify(data) + b'>\n'
sock.send(data_sent)
def send_orders_servos(sockets, servo_id, angles):
for i in range(len(sockets)):
send_order(sockets[i], ORDER_SERVO, servo_id, angles[i])
def main():
board = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
board.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
boards = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for _ in range(5)]
for i, sock in enumerate(boards):
port = SERVOS_PORT_BASE + i
can_rx = SERVOS_ID_BASE + i
can_tx = can_rx | TX_ID_OFFSET
can_rx = format(can_rx, 'x')
can_tx = format(can_tx, 'x')
print("CAN IDs:", can_rx, can_tx)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
print("Starting isotpserver : port/rx/tx:", port, can_rx, can_tx)
system("isotpserver -l "+ str(port) + " -s " + can_tx + " -d " + can_rx + " can0 &")
sleep(0.1)
print("Trying to connect to localhost:", port)
sock.connect(("localhost", port))
print("Sockets ready")
help_str = "Ordres: \n\tO= open grabbers, T = take cups, R = reset position, ? = this message"
print(help_str)
try:
while(True):
order = input()
if order == "?":
print(help_str)
else:
if order == "O":
send_orders_servos(boards, 0, SERVO_ANGLES_OPEN)
elif order == "T":
send_orders_servos(boards, 2, SERVO_ANGLES_BIG_LOW)
sleep(1)
send_orders_servos(boards, 1, SERVO_ANGLES_MID_LOW)
sleep(1)
send_orders_servos(boards, 0, SERVO_ANGLES_CLOSE)
elif order == "R":
send_orders_servos(boards, 1, SERVO_ANGLES_MID_HIGH)
send_orders_servos(boards, 2, SERVO_ANGLES_BIG_HIGH)
except KeyboardInterrupt:
print("Arret, close du socket")
board.close()
if __name__ == "__main__":
main()
exit(0)