-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgraph_nav_util.py
138 lines (115 loc) · 5.92 KB
/
graph_nav_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# Copyright (c) 2022 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Graph nav utility functions"""
def id_to_short_code(id):
"""Convert a unique id to a 2 letter short code."""
tokens = id.split('-')
if len(tokens) > 2:
return '%c%c' % (tokens[0][0], tokens[1][0])
return None
def pretty_print_waypoints(waypoint_id, waypoint_name, short_code_to_count, localization_id):
short_code = id_to_short_code(waypoint_id)
if short_code is None or short_code_to_count[short_code] != 1:
short_code = ' ' # If the short code is not valid/unique, don't show it.
print(
"%s Waypoint name: %s id: %s short code: %s" %
('->' if localization_id == waypoint_id else ' ', waypoint_name, waypoint_id, short_code))
def find_unique_waypoint_id(short_code, graph, name_to_id):
"""Convert either a 2 letter short code or an annotation name into the associated unique id."""
if graph is None:
print(
"Please list the waypoints in the map before trying to navigate to a specific one (Option #4)."
)
return
if len(short_code) != 2:
# Not a short code, check if it is an annotation name (instead of the waypoint id).
if short_code in name_to_id:
# Short code is a waypoint's annotation name. Check if it is paired with a unique waypoint id.
if name_to_id[short_code] is not None:
# Has an associated waypoint id!
return name_to_id[short_code]
else:
print("The waypoint name %s is used for multiple different unique waypoints. Please use" + \
"the waypoint id." % (short_code))
return None
# Also not a waypoint annotation name, so we will operate under the assumption that it is a
# unique waypoint id.
return short_code
ret = short_code
for waypoint in graph.waypoints:
if short_code == id_to_short_code(waypoint.id):
if ret != short_code:
return short_code # Multiple waypoints with same short code.
ret = waypoint.id
return ret
def update_waypoints_and_edges(graph, localization_id, do_print=True):
"""Update and print waypoint ids and edge ids."""
name_to_id = dict()
edges = dict()
short_code_to_count = {}
waypoint_to_timestamp = []
for waypoint in graph.waypoints:
# Determine the timestamp that this waypoint was created at.
timestamp = -1.0
try:
timestamp = waypoint.annotations.creation_time.seconds + waypoint.annotations.creation_time.nanos / 1e9
except:
# Must be operating on an older graph nav map, since the creation_time is not
# available within the waypoint annotations message.
pass
waypoint_to_timestamp.append((waypoint.id, timestamp, waypoint.annotations.name))
# Determine how many waypoints have the same short code.
short_code = id_to_short_code(waypoint.id)
if short_code not in short_code_to_count:
short_code_to_count[short_code] = 0
short_code_to_count[short_code] += 1
# Add the annotation name/id into the current dictionary.
waypoint_name = waypoint.annotations.name
if waypoint_name:
if waypoint_name in name_to_id:
# Waypoint name is used for multiple different waypoints, so set the waypoint id
# in this dictionary to None to avoid confusion between two different waypoints.
name_to_id[waypoint_name] = None
else:
# First time we have seen this waypoint annotation name. Add it into the dictionary
# with the respective waypoint unique id.
name_to_id[waypoint_name] = waypoint.id
# Sort the set of waypoints by their creation timestamp. If the creation timestamp is unavailable,
# fallback to sorting by annotation name.
waypoint_to_timestamp = sorted(waypoint_to_timestamp, key=lambda x: (x[1], x[2]))
# Print out the waypoints name, id, and short code in an ordered sorted by the timestamp from
# when the waypoint was created.
if do_print:
print('%d waypoints:' % len(graph.waypoints))
for waypoint in waypoint_to_timestamp:
pretty_print_waypoints(waypoint[0], waypoint[2], short_code_to_count, localization_id)
for edge in graph.edges:
if edge.id.to_waypoint in edges:
if edge.id.from_waypoint not in edges[edge.id.to_waypoint]:
edges[edge.id.to_waypoint].append(edge.id.from_waypoint)
else:
edges[edge.id.to_waypoint] = [edge.id.from_waypoint]
if do_print:
print("(Edge) from waypoint {} to waypoint {} (cost {})".format(
edge.id.from_waypoint, edge.id.to_waypoint, edge.annotations.cost.value))
return name_to_id, edges
def sort_waypoints_chrono(graph):
"""Sort waypoints by time created."""
waypoint_to_timestamp = []
for waypoint in graph.waypoints:
# Determine the timestamp that this waypoint was created at.
timestamp = -1.0
try:
timestamp = waypoint.annotations.creation_time.seconds + waypoint.annotations.creation_time.nanos / 1e9
except:
# Must be operating on an older graph nav map, since the creation_time is not
# available within the waypoint annotations message.
pass
waypoint_to_timestamp.append((waypoint.id, timestamp, waypoint.annotations.name))
# Sort the set of waypoints by their creation timestamp. If the creation timestamp is unavailable,
# fallback to sorting by annotation name.
waypoint_to_timestamp = sorted(waypoint_to_timestamp, key=lambda x: (x[1], x[2]))
return waypoint_to_timestamp