-
Notifications
You must be signed in to change notification settings - Fork 5
/
state_machine.exd
139 lines (113 loc) · 4.25 KB
/
state_machine.exd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import os, sys
from std_msgs.msg import Empty
from std_srvs.srv import Trigger
from smach import StateMachine
from smach.state import State
import smach_ros
import rospy
import hbp_nrp_excontrol.nrp_states as states
from hbp_nrp_excontrol.logs import clientLogger
sys.path.append(os.path.join(os.path.dirname(__file__)))
import thimblerigger_config as tc
from thimblerigger_server import ThimbleriggerChallengeServer
from thimblerigger import Thimblerigger
from stepper import Stepper
FINISHED = 'FINISHED'
ERROR = 'ERROR'
PREEMPTED = 'PREEMPTED'
def start_callback(userdata, msg):
return False
def step_callback(userdata, msg):
return False
thimblerigger = Thimblerigger(num_mugs=tc.num_mugs,
num_shuffles=tc.num_shuffles,
seed=tc.seed, movement_rate=tc.movement_rate)
sm = StateMachine(outcomes=[FINISHED, ERROR, PREEMPTED])
with sm:
# Wait for challenge start signal
StateMachine.add(
"wait_start",
smach_ros.MonitorState(tc.thimblerigger_started_topic, Empty, start_callback),
transitions={"valid": "wait_start", "invalid": "wait_lift_mug", "preempted": PREEMPTED}
)
# Wait until the user wants to lift the correct mug to see which one she needs to track
StateMachine.add(
"wait_lift_mug",
smach_ros.MonitorState(tc.thimblerigger_step_topic, Empty, step_callback),
transitions={"valid": "wait_lift_mug",
"invalid": "lift_correct_mug",
"preempted": PREEMPTED}
)
# Lift the correct mug
StateMachine.add(
"lift_correct_mug",
smach_ros.ServiceState(tc.thimblerigger_show_correct_service, Trigger),
transitions={"succeeded": "wait_hide_mug",
"aborted": ERROR,
"preempted": PREEMPTED
}
)
# Wait until the user wants to hide the correct mug again
StateMachine.add(
"wait_hide_mug",
smach_ros.MonitorState(tc.thimblerigger_step_topic, Empty, step_callback),
transitions={"valid": "wait_hide_mug",
"invalid": "hide_correct_mug",
"preempted": PREEMPTED}
)
# Hide the correct mug again
StateMachine.add(
"hide_correct_mug",
smach_ros.ServiceState(tc.thimblerigger_hide_correct_service, Trigger),
transitions={"succeeded": "wait_shuffle",
"aborted": ERROR,
"preempted": PREEMPTED
}
)
# Wait until the user want to shuffle
StateMachine.add(
"wait_shuffle",
smach_ros.MonitorState(tc.thimblerigger_step_topic, Empty, step_callback),
transitions={"valid": "wait_shuffle",
"invalid": "shuffle",
"preempted": PREEMPTED}
)
# Shuffle the mugs
StateMachine.add(
"shuffle",
smach_ros.ServiceState(tc.thimblerigger_shuffle_service, Trigger),
transitions={"succeeded": "wait_lift_mug2",
"aborted": ERROR,
"preempted": PREEMPTED
}
)
StateMachine.add(
"wait_lift_mug2",
smach_ros.MonitorState(tc.thimblerigger_step_topic, Empty, step_callback),
transitions={"valid": "wait_lift_mug2",
"invalid": "lift_correct_mug2",
"preempted": PREEMPTED}
)
# Lift the correct mug
StateMachine.add(
"lift_correct_mug2",
smach_ros.ServiceState(tc.thimblerigger_show_correct_service, Trigger),
transitions={"succeeded": "wait",
"aborted": ERROR,
"preempted": PREEMPTED
}
)
StateMachine.add(
"wait",
smach_ros.MonitorState(tc.thimblerigger_step_topic, Empty, step_callback),
transitions={"valid": FINISHED,
"invalid": ERROR,
"preempted": PREEMPTED}
)
clientLogger.info("Starting Thimblerigger challenge server...")
challenge_server = ThimbleriggerChallengeServer()
challenge_server.serve()
clientLogger.info("Thimblerigger challenge server started.")
# Uncomment the following lines if you want the challenge to run automatically
#stepper = Stepper()
#stepper.run_async()