-
Notifications
You must be signed in to change notification settings - Fork 1
/
robot.py
80 lines (61 loc) · 2.35 KB
/
robot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from random import Random
from core.deeco import BaseKnowledge
from core.deeco import Component
from core.deeco import Node
from core.deeco import Role
from core.deeco import process
from core.position import Position
from core.packets import TextPacket
# Role
class Rover(Role):
def __init__(self):
super().__init__()
self.position = None
self.goal = None
# Component
class Robot(Component):
SPEED = 0.01
COLORS = ["red", "blue", "green"]
random = Random(0)
@staticmethod
def gen_position():
return Position(Robot.random.uniform(0, 1), Robot.random.uniform(0, 1))
# Knowledge definition
class Knowledge(BaseKnowledge, Rover):
def __init__(self):
super().__init__()
self.color = None
# Component initialization
def __init__(self, node: Node):
super().__init__(node)
# Initialize knowledge
self.knowledge.position = node.positionProvider.get()
self.knowledge.goal = self.gen_position()
self.knowledge.color = self.random.choice(self.COLORS)
# # Register network receive method
# node.networkDevice.add_receiver(self.__receive_packet)
node.position = self.knowledge.position
print("Robot " + str(self.knowledge.id) + " created")
# def __receive_packet(self, packet):
# print((str(self.knowledge.time) + " ms: " + str(self.knowledge.id) + " Received packet: " + str(packet)))
# Processes follow
@process(period_ms=10)
def update_time(self, node: Node):
self.knowledge.time = node.runtime.scheduler.get_time_ms()
@process(period_ms=1000)
def status(self, node: Node):
print(str(self.knowledge.time) + " ms: " + str(self.knowledge.id) + " at " + str(self.knowledge.position) + " goal " + str(
self.knowledge.goal) + " dist: " + str(self.knowledge.position.dist_to(self.knowledge.goal)))
@process(period_ms=100)
def sense_position(self, node: Node):
self.knowledge.position = node.positionProvider.get()
@process(period_ms=1000)
def set_goal(self, node: Node):
if self.knowledge.position == self.knowledge.goal:
self.knowledge.goal = self.gen_position()
node.walker.set_target(self.knowledge.goal)
node.walker.set_target(self.knowledge.goal)
# @process(period_ms=2500)
# def send_echo_packet(self, node: Node):
# node.networkDevice.send(node.id, TextPacket("Echo packet payload from: " + str(self.knowledge.id)))
# node.networkDevice.broadcast(TextPacket("Broadcast echo packet payload from: " + str(self.knowledge.id)))