-
Notifications
You must be signed in to change notification settings - Fork 4
/
arm.py
157 lines (143 loc) · 4.96 KB
/
arm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from pynput import keyboard
import socket
import argparse
import requests
from socketengine import client
import requests
Server = 'http://192.168.29.139:8000/robotic_arm'
class RoboticArm:
def __init__(self, is_server_running):
self.speeds = dict()
self.speeds.update({1 : 1})
self.speeds.update({2 : 1})
self.speeds.update({3 : 1})
self.speeds.update({4 : 1})
self.speeds.update({5 : 1})
self.speeds.update({6 : 1})
self.motor_active = -1
self.numkey = ""
self.active = False
self.c = client(addr = "127.0.0.1", port = 8001)
self.c.start()
self.is_server_running = is_server_running
if self.is_server_running == True:
self.s = socket.socket()
self.host = '192.168.29.139'
self.port = 9998 # Must be same as that in server.py
print('If you dont see working fine as the next msg , change the host as the ip adress of pi')
# In client.py we use another way to bind host and port together by using connect function()
self.s.connect((self.host, self.port))
print('Working fine!')
self.listener = keyboard.Listener(
on_press=self.on_press,
on_release=self.on_release)
self.done = False
def run(self):
self.listener.start()
while self.done == False:
continue
def sendDataToNode(self, m_no, pos):
for motor in range(1, 7):
if motor == m_no:
self.c.write("m" + str(motor), pos)
else:
self.c.write("m" + str(motor), 0)
def send(self, data):
if self.is_server_running == False:
return
self.s.send(str.encode(data))
checkDataTransfer = self.s.recv(1024)
print(checkDataTransfer)
def forward(self, num):
motornumber = int(format(num)[1])
print(motornumber)
data = str(1) + ','
for i in range(1,9):
if i == motornumber:
data = data + str(self.speeds[motornumber]) + ','
else:
data = data + str(0) + ','
print(data)
self.send(data)
self.sendDataToNode(motornumber, 1)
def back(self, num):
motornumber = int(format(num)[1])
data = str(1) + ','
for i in range(1,9):
if i == motornumber:
data = data + "-" + str(self.speeds[motornumber]) + ','
else:
data = data + str(0) + ','
print(data)
self.send(data)
self.sendDataToNode(motornumber, -1)
def stopall(self):
data = str(1) + ','
for i in range(1,9):
data = data + str(0) + ','
print(data)
self.send(data)
self.sendDataToNode(-1, 1)
def deactivate(self):
self.active = False
self.c.write("status_arm", -1)
self.numkey = ""
def activate(self):
self.active = True
self.c.write("status_arm", 1)
def Stop(self):
print("Exit initiated")
self.deactivate()
self.stopall()
self.s.send(str.encode("stop"))
# After sending we check if it was recieved or not
checkDataTranfer = self.s.recv(1024)
print(checkDataTranfer)
self.done = True
self.c.close()
self.s.close()
def on_press(self, key):
print("finding",format(key))
if(format(key) == "'a'"):
self.activate()
elif(format(key) == "'p'"):
self.deactivate()
elif(format(key) == "'s'"):
self.deactivate()
elif self.active == False:
print("Arm is inactive right now.")
print("If you want to activate the arm, then press the a key!")
return
elif(format(key) in ["'1'","'2'","'3'","'4'","'5'","'6'"]):
self.numkey = key
elif(format(key) == 'Key.up'):
if self.numkey == "":
print("Please select a motor")
elif self.motor_active == -1:
self.forward(self.numkey)
self.motor_active = 1
elif(format(key) == 'Key.down'):
if self.numkey == "":
print("Please select a motor")
elif self.motor_active == -1:
self.back(self.numkey)
self.motor_active = 1
elif(format(key) == "'q'"):
self.Stop()
def on_release(self, key):
self.stopall()
self.motor_active = -1
if key == keyboard.Key.esc: # Stop listener
return False
if __name__ == "__main__":
ap = argparse.ArgumentParser()
ap.add_argument("-i", "--server", type = bool, default = False, help = "Is the server running")
args = vars(ap.parse_args())
if args["server"] == True:
try:
requests.get(Server, timeout = 0.1)
except requests.exceptions.ReadTimeout:
pass
arm = RoboticArm(args["server"])
arm.run()
del arm