diff --git a/zod/constants.py b/zod/constants.py index 602e9eb..86596a6 100644 --- a/zod/constants.py +++ b/zod/constants.py @@ -95,9 +95,13 @@ class Lidar(Enum): VELODYNE = "velodyne" +class Radar(Enum): + FRONT = "front" + + Ego = Literal["ego"] EGO = typing.get_args(Ego)[0] -CoordinateFrame = Union[Camera, Lidar, Ego] +CoordinateFrame = Union[Camera, Lidar, Radar, Ego] # ZodFrame properties diff --git a/zod/data_classes/info.py b/zod/data_classes/info.py index d5ef553..b087e9d 100644 --- a/zod/data_classes/info.py +++ b/zod/data_classes/info.py @@ -5,10 +5,10 @@ from typing import Dict, Iterator, List, Optional, Tuple from zod.anno.parser import AnnotationFile -from zod.constants import AnnotationProject, Anonymization, Camera, Lidar +from zod.constants import AnnotationProject, Anonymization, Camera, Lidar, Radar from ._serializable import JSONSerializable -from .sensor import CameraFrame, LidarFrame, SensorFrame +from .sensor import CameraFrame, LidarFrame, RadarFrames, SensorFrame @dataclass @@ -29,6 +29,7 @@ class Information(JSONSerializable): annotations: Dict[AnnotationProject, AnnotationFile] camera_frames: Dict[str, List[CameraFrame]] # key is a combination of Camera and Anonymization lidar_frames: Dict[Lidar, List[LidarFrame]] + radar_frames: Optional[Dict[Radar, List[RadarFrames]]] = None @property def all_frames(self) -> Iterator[SensorFrame]: @@ -130,3 +131,11 @@ def get_camera_frames( def get_lidar_frames(self, lidar: Lidar = Lidar.VELODYNE) -> List[LidarFrame]: return self.lidar_frames[lidar] + + def get_radar_frames(self, radar: Radar) -> RadarFrames: + if self.radar_frames is None: + err = "No radar frames available!" + err += "\nPlease download the latest version of ZOD to access radar data. " + err += "\nRadar is available for ZOD Sequences and Drives only." + raise ValueError(err) + return self.radar_frames[radar] diff --git a/zod/data_classes/sensor.py b/zod/data_classes/sensor.py index 562b12f..f8d06d3 100644 --- a/zod/data_classes/sensor.py +++ b/zod/data_classes/sensor.py @@ -132,6 +132,155 @@ def __eq__(self, other: LidarData) -> Union[bool, np.bool_]: ) +@dataclass +class RadarData: + """A class describing the radar data.""" + + radar_range: np.ndarray # (N,) float32 + azimuth_angle: np.ndarray # (N,) float32 + elevation_angle: np.ndarray # (N,) float32 + range_rate: np.ndarray # (N,) float32 + amplitude: np.ndarray # (N,) float32 + validity: np.ndarray # (N,) int8 + mode: np.ndarray # (N,) uint8 + quality: np.ndarray # (N,) uint8 + scan_index: np.ndarray # (N,) uint32 + timestamp: np.ndarray # (N,) int64 + + def copy(self) -> RadarData: + """Return a copy of the radar data.""" + return RadarData( + radar_range=self.radar_range.copy(), + azimuth_angle=self.azimuth_angle.copy(), + elevation_angle=self.elevation_angle.copy(), + range_rate=self.range_rate.copy(), + amplitude=self.amplitude.copy(), + validity=self.validity.copy(), + mode=self.mode.copy(), + quality=self.quality.copy(), + scan_index=self.scan_index.copy(), + timestamp=self.timestamp, + ) + + @classmethod + def empty(cls) -> RadarData: + """Create an empty radar data object.""" + return cls( + radar_range=np.empty(0, dtype=np.float32), + azimuth_angle=np.empty(0, dtype=np.float32), + elevation_angle=np.empty(0, dtype=np.float32), + range_rate=np.empty(0, dtype=np.float32), + amplitude=np.empty(0, dtype=np.float32), + validity=np.empty(0, dtype=np.int8), + mode=np.empty(0, dtype=np.uint8), + quality=np.empty(0, dtype=np.uint8), + scan_index=np.empty(0, dtype=np.uint32), + timestamp=0, + ) + + @classmethod + def from_npy(cls, path: str) -> RadarData: + """Load radar data from a .npy file. + + Args: + path: Path to the file we are loading the data from.""" + data = np.load(path) + return cls( + radar_range=data["radar_range"], + azimuth_angle=data["azimuth_angle"], + elevation_angle=data["elevation_angle"], + range_rate=data["range_rate"], + amplitude=data["amplitude"], + validity=data["validity"], + mode=data["mode"], + quality=data["quality"], + scan_index=data["scan_index"], + timestamp=data["timestamp"], + ) + + def to_npy(self, path: str) -> None: + """Save radar data to a .npy file in the same format as is used for loading. + + Args: + path: Path of the file we are saving the data in.""" + data = np.empty( + len(self.radar_range), + dtype=[ + ("scan_index", np.uint32), + ("timestamp", np.int64), + ("radar_range", np.float32), + ("azimuth_angle", np.float32), + ("elevation_angle", np.float32), + ("range_rate", np.float32), + ("amplitude", np.float32), + ("validity", np.int8), + ("mode", np.uint8), + ("quality", np.uint8), + ], + ) + + data["radar_range"] = self.radar_range + data["azimuth_angle"] = self.azimuth_angle + data["elevation_angle"] = self.elevation_angle + data["range_rate"] = self.range_rate + data["amplitude"] = self.amplitude + data["validity"] = self.validity + data["mode"] = self.mode + data["quality"] = self.quality + data["scan_index"] = self.scan_index + if len(self.timestamp) == 1: + data["timestamp"] = self.timestamp + else: + times = np.empty(len(self.radar_range), dtype=np.int64) + for i in range(len(self.timestamp)): + times[self.scan_index == i] = self.timestamp[i] + data["timestamp"] = times + + np.save(path, data) + + def get_cartesian_coordinates(self) -> np.ndarray: + """Convert radar data to cartesian coordinates with shape (N x 3).""" + x = self.radar_range * np.cos(self.elevation_angle) * np.cos(self.azimuth_angle) + y = self.radar_range * np.cos(self.elevation_angle) * np.sin(self.azimuth_angle) + z = self.radar_range * np.sin(self.elevation_angle) + return np.vstack((x, y, z)).T + + def extend(self, *other: RadarData): + """Extend this RadarData with data from another RadarData object. + + Args: + other: The other RadarData object. + """ + self.radar_range = np.hstack((self.radar_range, *(o.radar_range for o in other))) + self.azimuth_angle = np.hstack((self.azimuth_angle, *(o.azimuth_angle for o in other))) + self.elevation_angle = np.hstack((self.elevation_angle, *(o.elevation_angle for o in other))) + self.range_rate = np.hstack((self.range_rate, *(o.range_rate for o in other))) + self.amplitude = np.hstack((self.amplitude, *(o.amplitude for o in other))) + self.validity = np.hstack((self.validity, *(o.validity for o in other))) + self.mode = np.hstack((self.mode, *(o.mode for o in other))) + self.quality = np.hstack((self.quality, *(o.quality for o in other))) + self.scan_index = np.hstack((self.scan_index, *(o.scan_index for o in other))) + self.timestamp = np.vstack((self.timestamp, *(o.timestamp for o in other))) + + def __eq__(self, other: RadarData) -> Union[bool, np.bool_]: + """Check if two RadarData objects are equal. + + Args: + other: The other RadarData object.""" + return ( + np.allclose(self.radar_range, other.radar_range) + and np.allclose(self.azimuth_angle, other.azimuth_angle) + and np.allclose(self.elevation_angle, other.elevation_angle) + and np.allclose(self.range_rate, other.range_rate) + and np.allclose(self.amplitude, other.amplitude) + and np.allclose(self.validity, other.validity) + and np.allclose(self.mode, other.mode) + and np.allclose(self.quality, other.quality) + and np.allclose(self.scan_index, other.scan_index) + and np.isclose(self.timestamp, other.timestamp) + ) + + @dataclass class SensorFrame(JSONSerializable): """Class to store sensor information.""" @@ -155,6 +304,18 @@ def read(self, remove_ego_lidar_returns: bool = True) -> LidarData: return LidarData.from_npy(self.filepath, remove_ego_lidar_returns=remove_ego_lidar_returns) +@dataclass +class RadarFrames(JSONSerializable): + """Class to store information about a radar sequence file.""" + + filepath: str + time: datetime # time of the sequence key frame + + def read(self) -> RadarData: + """Read the radar data.""" + return RadarData.from_npy(self.filepath) + + @dataclass class CameraFrame(SensorFrame): """Class to store information about a camera frame."""