diff --git a/custom_components/xiaomi_cloud_map_extractor/camera.py b/custom_components/xiaomi_cloud_map_extractor/camera.py index cd13702..a8e2489 100644 --- a/custom_components/xiaomi_cloud_map_extractor/camera.py +++ b/custom_components/xiaomi_cloud_map_extractor/camera.py @@ -1,18 +1,14 @@ import io import logging -import time from datetime import timedelta -from enum import Enum -from typing import Any, Dict, List, Optional +from enum import StrEnum -from custom_components.xiaomi_cloud_map_extractor.common.map_data import MapData -from custom_components.xiaomi_cloud_map_extractor.common.vacuum import XiaomiCloudVacuum -from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts +from vacuum_map_parser_base.config.color import ColorsPalette +from vacuum_map_parser_base.config.drawable import Drawable +from vacuum_map_parser_base.config.image_config import ImageConfig +from vacuum_map_parser_base.config.size import Sizes +from vacuum_map_parser_base.config.text import Text -try: - from miio import RoborockVacuum, DeviceException -except ImportError: - from miio import Vacuum as RoborockVacuum, DeviceException import PIL.Image as Image import voluptuous as vol from homeassistant.components.camera import Camera, ENTITY_ID_FORMAT, PLATFORM_SCHEMA, SUPPORT_ON_OFF @@ -21,14 +17,16 @@ from homeassistant.helpers.entity import generate_entity_id from homeassistant.helpers.reload import async_setup_reload_service -from custom_components.xiaomi_cloud_map_extractor.common.map_data_parser import MapDataParser -from custom_components.xiaomi_cloud_map_extractor.common.xiaomi_cloud_connector import XiaomiCloudConnector -from custom_components.xiaomi_cloud_map_extractor.const import * -from custom_components.xiaomi_cloud_map_extractor.dreame.vacuum import DreameVacuum -from custom_components.xiaomi_cloud_map_extractor.roidmi.vacuum import RoidmiVacuum -from custom_components.xiaomi_cloud_map_extractor.unsupported.vacuum import UnsupportedVacuum -from custom_components.xiaomi_cloud_map_extractor.viomi.vacuum import ViomiVacuum -from custom_components.xiaomi_cloud_map_extractor.xiaomi.vacuum import XiaomiVacuum +from vacuum_map_parser_base.map_data import MapData + +from .vacuum_platforms.xiaomi_cloud_connector import XiaomiCloudConnector +from .vacuum_platforms.vacuum_base import XiaomiCloudVacuum, VacuumConfig +from .vacuum_platforms.vacuum_dreame import DreameCloudVacuum +from .vacuum_platforms.vacuum_roborock import RoborockCloudVacuum +from .vacuum_platforms.vacuum_roidmi import RoidmiCloudVacuum +from .vacuum_platforms.vacuum_viomi import ViomiCloudVacuum +from .vacuum_platforms.vacuum_unsupported import UnsupportedCloudVacuum +from .const import * _LOGGER = logging.getLogger(__name__) @@ -155,16 +153,17 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info= class VacuumCamera(Camera): def __init__(self, entity_id: str, host: str, token: str, username: str, password: str, country: str, name: str, - should_poll: bool, image_config: ImageConfig, colors: Colors, drawables: Drawables, sizes: Sizes, - texts: Texts, attributes: List[str], store_map_raw: bool, store_map_image: bool, store_map_path: str, - force_api: str): + should_poll: bool, image_config: ImageConfig, colors: ColorsPalette, drawables: list[Drawable], + sizes: Sizes, texts: list[Text], attributes: list[str], store_map_raw: bool, store_map_image: bool, + store_map_path: str, force_api: str): super().__init__() self.entity_id = entity_id self.content_type = CONTENT_TYPE - self._vacuum = RoborockVacuum(host, token) + self._host = host + self._token = token self._connector = XiaomiCloudConnector(username, password) self._status = CameraStatus.INITIALIZING - self._device = None + self._device: XiaomiCloudVacuum | None = None self._name = name self._should_poll = should_poll self._image_config = image_config @@ -183,7 +182,6 @@ def __init__(self, entity_id: str, host: str, token: str, username: str, passwor self._map_data = None self._logged_in = False self._logged_in_previously = True - self._received_map_name_previously = True self._country = country async def async_added_to_hass(self) -> None: @@ -193,7 +191,7 @@ async def async_added_to_hass(self) -> None: def frame_interval(self) -> float: return 1 - def camera_image(self, width: Optional[int] = None, height: Optional[int] = None) -> Optional[bytes]: + def camera_image(self, width: int | None = None, height: int | None = None) -> bytes | None: return self._image @property @@ -211,7 +209,7 @@ def supported_features(self) -> int: return SUPPORT_ON_OFF @property - def extra_state_attributes(self) -> Dict[str, Any]: + def extra_state_attributes(self) -> dict[str, any]: attributes = {} if self._map_data is not None: attributes.update(self.extract_attributes(self._map_data, self._attributes, self._country)) @@ -229,18 +227,17 @@ def should_poll(self) -> bool: return self._should_poll @staticmethod - def extract_attributes(map_data: MapData, attributes_to_return: List[str], country) -> Dict[str, Any]: + def extract_attributes(map_data: MapData, attributes_to_return: list[str], country) -> dict[str, any]: attributes = {} rooms = [] if map_data.rooms is not None: - rooms = dict(filter(lambda x: x[0] is not None, ((x[0], x[1].name) for x in map_data.rooms.items()))) + rooms = dict(filter(lambda x: x[1] is not None, ((x[0], x[1].name) for x in map_data.rooms.items()))) if len(rooms) == 0: rooms = list(map_data.rooms.keys()) for name, value in { ATTRIBUTE_CALIBRATION: map_data.calibration(), - ATTRIBUTE_CARPET_MAP: map_data.carpet_map, ATTRIBUTE_CHARGER: map_data.charger, - ATTRIBUTE_CLEANED_ROOMS: map_data.cleaned_rooms, + ATTRIBUTE_CLEANED_ROOMS: [*(map_data.cleaned_rooms or [])], ATTRIBUTE_COUNTRY: country, ATTRIBUTE_GOTO: map_data.goto, ATTRIBUTE_GOTO_PATH: map_data.goto_path, @@ -270,24 +267,20 @@ def extract_attributes(map_data: MapData, attributes_to_return: List[str], count return attributes def update(self): - counter = 10 if self._status != CameraStatus.TWO_FACTOR_AUTH_REQUIRED and not self._logged_in: - self._handle_login() + self._login() if self._device is None and self._logged_in: - self._handle_device() - map_name = self._handle_map_name(counter) - if map_name == "retry" and self._device is not None: - self._status = CameraStatus.FAILED_TO_RETRIEVE_MAP_FROM_VACUUM - self._received_map_name_previously = map_name != "retry" - if self._logged_in and map_name != "retry" and self._device is not None: - self._handle_map_data(map_name) + self._initialize_device() + if self._logged_in and self._device is not None: + self._download_map_data() else: - _LOGGER.debug("Unable to retrieve map, reasons: Logged in - %s, map name - %s, device retrieved - %s", - self._logged_in, map_name, self._device is not None) - self._set_map_data(MapDataParser.create_empty(self._colors, str(self._status))) + _LOGGER.debug("Unable to retrieve map, reasons: Logged in - %s, device retrieved - %s", + self._logged_in, self._device is not None) + if self._device is not None: + self._set_map_data(self._device.map_data_parser.create_empty(str(self._status))) self._logged_in_previously = self._logged_in - def _handle_login(self): + def _login(self): _LOGGER.debug("Logging in...") self._logged_in = self._connector.login() if self._logged_in is None: @@ -302,9 +295,9 @@ def _handle_login(self): if self._logged_in_previously: _LOGGER.error("Unable to log in, check credentials") - def _handle_device(self): + def _initialize_device(self): _LOGGER.debug("Retrieving device info, country: %s", self._country) - country, user_id, device_id, model = self._connector.get_device_details(self._vacuum.token, self._country) + country, user_id, device_id, model = self._connector.get_device_details(self._token, self._country) if model is not None: self._country = country _LOGGER.debug("Retrieved device model: %s", model) @@ -314,31 +307,9 @@ def _handle_device(self): _LOGGER.error("Failed to retrieve model") self._status = CameraStatus.FAILED_TO_RETRIEVE_DEVICE - def _handle_map_name(self, counter: int) -> str: - map_name = "retry" - if self._device is not None and not self._device.should_get_map_from_vacuum(): - map_name = "0" - while map_name == "retry" and counter > 0: - _LOGGER.debug("Retrieving map name from device") - time.sleep(0.1) - try: - map_name = self._vacuum.map()[0] - _LOGGER.debug("Map name %s", map_name) - except OSError as exc: - _LOGGER.error("Got OSError while fetching the state: %s", exc) - except DeviceException as exc: - if self._received_map_name_previously: - _LOGGER.warning("Got exception while fetching the state: %s", exc) - self._received_map_name_previously = False - finally: - counter = counter - 1 - return map_name - - def _handle_map_data(self, map_name: str): + def _download_map_data(self): _LOGGER.debug("Retrieving map from Xiaomi cloud") - store_map_path = self._store_map_path if self._store_map_raw else None - map_data, map_stored = self._device.get_map(map_name, self._colors, self._drawables, self._texts, - self._sizes, self._image_config, store_map_path) + map_data, map_stored = self._device.get_map() if map_data is not None: # noinspection PyBroadException try: @@ -370,17 +341,33 @@ def _set_map_data(self, map_data: MapData): def _create_device(self, user_id: str, device_id: str, model: str) -> XiaomiCloudVacuum: self._used_api = self._detect_api(model) + store_map_path = self._store_map_path if self._store_map_raw else None + vacuum_config = VacuumConfig( + self._connector, + self._country, + user_id, + device_id, + self._host, + self._token, + model, + self._colors, + self._drawables, + self._image_config, + self._sizes, + self._texts, + store_map_path + ) if self._used_api == CONF_AVAILABLE_API_XIAOMI: - return XiaomiVacuum(self._connector, self._country, user_id, device_id, model) + return RoborockCloudVacuum(vacuum_config) if self._used_api == CONF_AVAILABLE_API_VIOMI: - return ViomiVacuum(self._connector, self._country, user_id, device_id, model) + return ViomiCloudVacuum(vacuum_config) if self._used_api == CONF_AVAILABLE_API_ROIDMI: - return RoidmiVacuum(self._connector, self._country, user_id, device_id, model) + return RoidmiCloudVacuum(vacuum_config) if self._used_api == CONF_AVAILABLE_API_DREAME: - return DreameVacuum(self._connector, self._country, user_id, device_id, model) - return UnsupportedVacuum(self._connector, self._country, user_id, device_id, model) + return DreameCloudVacuum(vacuum_config) + return UnsupportedCloudVacuum(vacuum_config) - def _detect_api(self, model: str) -> Optional[str]: + def _detect_api(self, model: str) -> str | None: if self._forced_api is not None: return self._forced_api if model in API_EXCEPTIONS: @@ -403,7 +390,7 @@ def _store_image(self): _LOGGER.warning("Error while saving image") -class CameraStatus(Enum): +class CameraStatus(StrEnum): EMPTY_MAP = 'Empty map' FAILED_LOGIN = 'Failed to login' FAILED_TO_RETRIEVE_DEVICE = 'Failed to retrieve device' diff --git a/custom_components/xiaomi_cloud_map_extractor/common/image_handler.py b/custom_components/xiaomi_cloud_map_extractor/common/image_handler.py deleted file mode 100644 index b05fa32..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/common/image_handler.py +++ /dev/null @@ -1,359 +0,0 @@ -import logging -import math -from typing import Callable, Dict, List - -from PIL import Image, ImageDraw, ImageFont -from PIL.Image import Image as ImageType - -from custom_components.xiaomi_cloud_map_extractor.common.map_data import Area, ImageData, Obstacle, Path, Point, Room, \ - Wall, Zone -from custom_components.xiaomi_cloud_map_extractor.const import * -from custom_components.xiaomi_cloud_map_extractor.types import Color, Colors, Sizes, Texts - -_LOGGER = logging.getLogger(__name__) - - -class ImageHandler: - COLORS = { - COLOR_MAP_INSIDE: (32, 115, 185), - COLOR_MAP_OUTSIDE: (19, 87, 148), - COLOR_MAP_WALL: (100, 196, 254), - COLOR_MAP_WALL_V2: (93, 109, 126), - COLOR_GREY_WALL: (93, 109, 126), - COLOR_CLEANED_AREA: (127, 127, 127, 127), - COLOR_PATH: (147, 194, 238), - COLOR_GOTO_PATH: (0, 255, 0), - COLOR_PREDICTED_PATH: (255, 255, 0), - COLOR_ZONES: (0xAD, 0xD8, 0xFF, 0x8F), - COLOR_ZONES_OUTLINE: (0xAD, 0xD8, 0xFF), - COLOR_VIRTUAL_WALLS: (255, 0, 0), - COLOR_NEW_DISCOVERED_AREA: (64, 64, 64), - COLOR_CARPETS: (0xA9, 0xF7, 0xA9), - COLOR_NO_CARPET_ZONES: (255, 33, 55, 127), - COLOR_NO_CARPET_ZONES_OUTLINE: (255, 0, 0), - COLOR_NO_GO_ZONES: (255, 33, 55, 127), - COLOR_NO_GO_ZONES_OUTLINE: (255, 0, 0), - COLOR_NO_MOPPING_ZONES: (163, 130, 211, 127), - COLOR_NO_MOPPING_ZONES_OUTLINE: (163, 130, 211), - COLOR_CHARGER: (0x66, 0xfe, 0xda, 0x7f), - COLOR_CHARGER_OUTLINE: (0x66, 0xfe, 0xda, 0x7f), - COLOR_ROBO: (0xff, 0xff, 0xff), - COLOR_ROBO_OUTLINE: (0, 0, 0), - COLOR_ROOM_NAMES: (0, 0, 0), - COLOR_OBSTACLE: (0, 0, 0, 128), - COLOR_IGNORED_OBSTACLE: (0, 0, 0, 128), - COLOR_OBSTACLE_WITH_PHOTO: (0, 0, 0, 128), - COLOR_IGNORED_OBSTACLE_WITH_PHOTO: (0, 0, 0, 128), - COLOR_UNKNOWN: (0, 0, 0), - COLOR_SCAN: (0xDF, 0xDF, 0xDF), - COLOR_ROOM_1: (240, 178, 122), - COLOR_ROOM_2: (133, 193, 233), - COLOR_ROOM_3: (217, 136, 128), - COLOR_ROOM_4: (52, 152, 219), - COLOR_ROOM_5: (205, 97, 85), - COLOR_ROOM_6: (243, 156, 18), - COLOR_ROOM_7: (88, 214, 141), - COLOR_ROOM_8: (245, 176, 65), - COLOR_ROOM_9: (252, 212, 81), - COLOR_ROOM_10: (72, 201, 176), - COLOR_ROOM_11: (84, 153, 199), - COLOR_ROOM_12: (133, 193, 233), - COLOR_ROOM_13: (245, 176, 65), - COLOR_ROOM_14: (82, 190, 128), - COLOR_ROOM_15: (72, 201, 176), - COLOR_ROOM_16: (165, 105, 189) - } - ROOM_COLORS = [COLOR_ROOM_1, COLOR_ROOM_2, COLOR_ROOM_3, COLOR_ROOM_4, COLOR_ROOM_5, COLOR_ROOM_6, COLOR_ROOM_7, - COLOR_ROOM_8, COLOR_ROOM_9, COLOR_ROOM_10, COLOR_ROOM_11, COLOR_ROOM_12, COLOR_ROOM_13, - COLOR_ROOM_14, COLOR_ROOM_15, COLOR_ROOM_16] - - @staticmethod - def create_empty_map_image(colors: Colors, text: str = "NO MAP") -> ImageType: - color = ImageHandler.__get_color__(COLOR_MAP_OUTSIDE, colors) - image = Image.new('RGBA', (300, 200), color=color) - if sum(color[0:3]) > 382: - text_color = (0, 0, 0) - else: - text_color = (255, 255, 255) - draw = ImageDraw.Draw(image, "RGBA") - l, t, r, b = draw.textbbox((0, 0), text) - w, h = r - l, b - t - draw.text(((image.size[0] - w) / 2, (image.size[1] - h) / 2), text, fill=text_color) - return image - - @staticmethod - def draw_path(image: ImageData, path: Path, sizes: Sizes, colors: Colors, scale: float): - ImageHandler.__draw_path__(image, path, sizes[CONF_SIZE_PATH_WIDTH], ImageHandler.__get_color__(COLOR_PATH, colors), scale) - - @staticmethod - def draw_goto_path(image: ImageData, path: Path, sizes: Sizes, colors: Colors, scale: float): - ImageHandler.__draw_path__(image, path, sizes[CONF_SIZE_PATH_WIDTH], ImageHandler.__get_color__(COLOR_GOTO_PATH, colors), scale) - - @staticmethod - def draw_predicted_path(image: ImageData, path: Path, sizes: Sizes, colors: Colors, scale: float): - ImageHandler.__draw_path__(image, path, sizes[CONF_SIZE_PATH_WIDTH], ImageHandler.__get_color__(COLOR_PREDICTED_PATH, colors), scale) - - @staticmethod - def draw_mop_path(image: ImageData, path: Path, sizes: Sizes, colors: Colors, scale: float): - ImageHandler.__draw_path__(image, path, sizes[CONF_SIZE_MOP_PATH_WIDTH], ImageHandler.__get_color__(COLOR_MOP_PATH, colors), scale) - - @staticmethod - def draw_no_carpet_areas(image: ImageData, areas: List[Area], colors: Colors): - ImageHandler.__draw_areas__(image, areas, - ImageHandler.__get_color__(COLOR_NO_CARPET_ZONES, colors), - ImageHandler.__get_color__(COLOR_NO_CARPET_ZONES_OUTLINE, colors)) - - @staticmethod - def draw_no_go_areas(image: ImageData, areas: List[Area], colors: Colors): - ImageHandler.__draw_areas__(image, areas, - ImageHandler.__get_color__(COLOR_NO_GO_ZONES, colors), - ImageHandler.__get_color__(COLOR_NO_GO_ZONES_OUTLINE, colors)) - - @staticmethod - def draw_no_mopping_areas(image: ImageData, areas: List[Area], colors: Colors): - ImageHandler.__draw_areas__(image, areas, - ImageHandler.__get_color__(COLOR_NO_MOPPING_ZONES, colors), - ImageHandler.__get_color__(COLOR_NO_MOPPING_ZONES_OUTLINE, colors)) - - @staticmethod - def draw_walls(image: ImageData, walls: List[Wall], colors: Colors): - draw = ImageDraw.Draw(image.data, 'RGBA') - for wall in walls: - draw.line(wall.to_img(image.dimensions).as_list(), - ImageHandler.__get_color__(COLOR_VIRTUAL_WALLS, colors), width=2) - - @staticmethod - def draw_zones(image: ImageData, zones: List[Zone], colors: Colors): - areas = [z.as_area() for z in zones] - ImageHandler.__draw_areas__(image, areas, - ImageHandler.__get_color__(COLOR_ZONES, colors), - ImageHandler.__get_color__(COLOR_ZONES_OUTLINE, colors)) - - @staticmethod - def draw_charger(image: ImageData, charger: Point, sizes: Sizes, colors: Colors): - color = ImageHandler.__get_color__(COLOR_CHARGER, colors) - outline = ImageHandler.__get_color__(COLOR_CHARGER_OUTLINE, colors) - radius = sizes[CONF_SIZE_CHARGER_RADIUS] - ImageHandler.__draw_pieslice__(image, charger, radius, outline, color) - - @staticmethod - def draw_obstacles(image: ImageData, obstacles, sizes: Sizes, colors: Colors): - color = ImageHandler.__get_color__(COLOR_OBSTACLE, colors) - radius = sizes[CONF_SIZE_OBSTACLE_RADIUS] - ImageHandler.draw_all_obstacles(image, obstacles, radius, color) - - @staticmethod - def draw_ignored_obstacles(image: ImageData, obstacles: List[Obstacle], sizes: Sizes, colors: Colors): - color = ImageHandler.__get_color__(COLOR_IGNORED_OBSTACLE, colors) - radius = sizes[CONF_SIZE_IGNORED_OBSTACLE_RADIUS] - ImageHandler.draw_all_obstacles(image, obstacles, radius, color) - - @staticmethod - def draw_obstacles_with_photo(image: ImageData, obstacles: List[Obstacle], sizes: Sizes, colors: Colors): - color = ImageHandler.__get_color__(COLOR_OBSTACLE_WITH_PHOTO, colors) - radius = sizes[CONF_SIZE_OBSTACLE_WITH_PHOTO_RADIUS] - ImageHandler.draw_all_obstacles(image, obstacles, radius, color) - - @staticmethod - def draw_ignored_obstacles_with_photo(image: ImageData, obstacles: List[Obstacle], sizes: Sizes, colors: Colors): - color = ImageHandler.__get_color__(COLOR_IGNORED_OBSTACLE_WITH_PHOTO, colors) - radius = sizes[CONF_SIZE_IGNORED_OBSTACLE_WITH_PHOTO_RADIUS] - ImageHandler.draw_all_obstacles(image, obstacles, radius, color) - - @staticmethod - def draw_all_obstacles(image: ImageData, obstacles: List[Obstacle], radius: float, color: Color): - for obstacle in obstacles: - ImageHandler.__draw_circle__(image, obstacle, radius, color, color) - - @staticmethod - def draw_vacuum_position(image: ImageData, vacuum_position: Point, sizes: Sizes, colors: Colors): - color = ImageHandler.__get_color__(COLOR_ROBO, colors) - outline = ImageHandler.__get_color__(COLOR_ROBO_OUTLINE, colors) - radius = sizes[CONF_SIZE_VACUUM_RADIUS] - ImageHandler.__draw_vacuum__(image, vacuum_position, radius, outline, color) - - @staticmethod - def draw_room_names(image: ImageData, rooms: Dict[int, Room], colors: Colors): - color = ImageHandler.__get_color__(COLOR_ROOM_NAMES, colors) - for room in rooms.values(): - p = room.point() - if p is not None: - point = p.to_img(image.dimensions) - ImageHandler.__draw_text__(image, room.name, point.x, point.y, color) - - @staticmethod - def rotate(image: ImageData): - if image.dimensions.rotation == 90: - image.data = image.data.transpose(Image.ROTATE_90) - if image.dimensions.rotation == 180: - image.data = image.data.transpose(Image.ROTATE_180) - if image.dimensions.rotation == 270: - image.data = image.data.transpose(Image.ROTATE_270) - - @staticmethod - def draw_texts(image: ImageData, texts: Texts): - for text_config in texts: - x = text_config[CONF_X] * image.data.size[0] / 100 - y = text_config[CONF_Y] * image.data.size[1] / 100 - ImageHandler.__draw_text__(image, text_config[CONF_TEXT], x, y, text_config[CONF_COLOR], - text_config[CONF_FONT], text_config[CONF_FONT_SIZE]) - - @staticmethod - def draw_layer(image: ImageData, layer_name: str): - ImageHandler.__draw_layer__(image, image.additional_layers[layer_name]) - - @staticmethod - def __use_transparency__(*colors): - return any(len(color) > 3 for color in colors) - - @staticmethod - def __draw_vacuum__(image: ImageData, vacuum_pos, r, outline, fill): - def draw_func(draw: ImageDraw): - if vacuum_pos.a is None: - vacuum_pos.a = 0 - point = vacuum_pos.to_img(image.dimensions) - r_scaled = r / 16 - # main outline - coords = [point.x - r, point.y - r, point.x + r, point.y + r] - draw.ellipse(coords, outline=outline, fill=fill) - if r >= 8: - # secondary outline - r2 = r_scaled * 14 - x = point.x - y = point.y - coords = [x - r2, y - r2, x + r2, y + r2] - draw.ellipse(coords, outline=outline, fill=None) - # bin cover - a1 = (vacuum_pos.a + 104) / 180 * math.pi - a2 = (vacuum_pos.a - 104) / 180 * math.pi - r2 = r_scaled * 13 - x1 = point.x - r2 * math.cos(a1) - y1 = point.y + r2 * math.sin(a1) - x2 = point.x - r2 * math.cos(a2) - y2 = point.y + r2 * math.sin(a2) - draw.line([x1, y1, x2, y2], width=1, fill=outline) - # lidar - angle = vacuum_pos.a / 180 * math.pi - r2 = r_scaled * 3 - x = point.x + r2 * math.cos(angle) - y = point.y - r2 * math.sin(angle) - r2 = r_scaled * 4 - coords = [x - r2, y - r2, x + r2, y + r2] - draw.ellipse(coords, outline=outline, fill=fill) - # button - half_color = ( - (outline[0] + fill[0]) // 2, - (outline[1] + fill[1]) // 2, - (outline[2] + fill[2]) // 2 - ) - r2 = r_scaled * 10 - x = point.x + r2 * math.cos(angle) - y = point.y - r2 * math.sin(angle) - r2 = r_scaled * 2 - coords = [x - r2, y - r2, x + r2, y + r2] - draw.ellipse(coords, outline=half_color, fill=half_color) - - ImageHandler.__draw_on_new_layer__(image, draw_func, 1, ImageHandler.__use_transparency__(outline, fill)) - - @staticmethod - def __draw_circle__(image: ImageData, center: Point, r: float, outline: Color, fill: Color): - def draw_func(draw: ImageDraw): - point = center.to_img(image.dimensions) - coords = [point.x - r, point.y - r, point.x + r, point.y + r] - draw.ellipse(coords, outline=outline, fill=fill) - - ImageHandler.__draw_on_new_layer__(image, draw_func, 1, ImageHandler.__use_transparency__(outline, fill)) - - @staticmethod - def __draw_pieslice__(image: ImageData, position, r, outline, fill): - def draw_func(draw: ImageDraw): - point = position.to_img(image.dimensions) - angle = -position.a if position.a is not None else 0 - coords = [point.x - r, point.y - r, point.x + r, point.y + r] - draw.pieslice(coords, angle + 90, angle - 90, outline=outline, fill=fill) - - ImageHandler.__draw_on_new_layer__(image, draw_func, 1, ImageHandler.__use_transparency__(outline, fill)) - - @staticmethod - def __draw_areas__(image: ImageData, areas: List[Area], fill: Color, outline: Color): - if len(areas) == 0: - return - - use_transparency = ImageHandler.__use_transparency__(outline, fill) - for area in areas: - def draw_func(draw: ImageDraw): - draw.polygon(area.to_img(image.dimensions).as_list(), fill, outline) - - ImageHandler.__draw_on_new_layer__(image, draw_func, 1, use_transparency) - - @staticmethod - def __draw_path__(image: ImageData, path: Path, path_width: int, color: Color, scale: float): - if len(path.path) < 1: - return - - def draw_func(draw: ImageDraw): - for current_path in path.path: - if len(current_path) > 1: - s = current_path[0].to_img(image.dimensions) - coords = None - for point in current_path[1:]: - e = point.to_img(image.dimensions) - sx = s.x * scale - sy = s.y * scale - ex = e.x * scale - ey = e.y * scale - draw.line([sx, sy, ex, ey], width=int(scale * path_width), fill=color) - if path_width > 4: - r = scale * path_width / 2 - if not coords: - coords = [sx - r, sy - r, sx + r, sy + r] - draw.pieslice(coords, 0, 360, outline=color, fill=color) - coords = [ex - r, ey - r, ex + r, ey + r] - draw.pieslice(coords, 0, 360, outline=color, fill=color) - s = e - - ImageHandler.__draw_on_new_layer__(image, draw_func, scale, ImageHandler.__use_transparency__(color)) - - @staticmethod - def __draw_text__(image: ImageData, text: str, x: float, y: float, color: Color, font_file=None, font_size=None): - def draw_func(draw: ImageDraw): - font = ImageFont.load_default() - try: - if font_file is not None and font_size > 0: - font = ImageFont.truetype(font_file, font_size) - except OSError: - _LOGGER.warning("Unable to find font file: %s", font_file) - except ImportError: - _LOGGER.warning("Unable to open font: %s", font_file) - finally: - l, t, r, b = draw.textbbox((0, 0), text, font) - w, h = r - l, b - t - draw.text((x - w / 2, y - h / 2), text, font=font, fill=color) - - ImageHandler.__draw_on_new_layer__(image, draw_func, 1, ImageHandler.__use_transparency__(color)) - - @staticmethod - def __get_color__(name, colors: Colors, default_name: str = None) -> Color: - if name in colors: - return colors[name] - if default_name is None: - return ImageHandler.COLORS[name] - return ImageHandler.COLORS[default_name] - - @staticmethod - def __draw_on_new_layer__(image: ImageData, draw_function: Callable, scale: float = 1, use_transparency=False): - if scale == 1 and not use_transparency: - draw = ImageDraw.Draw(image.data, "RGBA") - draw_function(draw) - else: - size = [int(image.data.size[0] * scale), int(image.data.size[1] * scale)] - layer = Image.new("RGBA", size, (255, 255, 255, 0)) - draw = ImageDraw.Draw(layer, "RGBA") - draw_function(draw) - if scale != 1: - layer = layer.resize(image.data.size, resample=Image.BOX) - ImageHandler.__draw_layer__(image, layer) - - @staticmethod - def __draw_layer__(image: ImageData, layer: ImageType): - image.data = Image.alpha_composite(image.data, layer) diff --git a/custom_components/xiaomi_cloud_map_extractor/common/map_data.py b/custom_components/xiaomi_cloud_map_extractor/common/map_data.py deleted file mode 100644 index c840674..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/common/map_data.py +++ /dev/null @@ -1,327 +0,0 @@ -from __future__ import annotations - -from typing import Any, Callable, Dict, List, Optional, Set - -from PIL.Image import Image as ImageType - -from custom_components.xiaomi_cloud_map_extractor.const import * -from custom_components.xiaomi_cloud_map_extractor.types import CalibrationPoints, ImageConfig - - -class Point: - def __init__(self, x: float, y: float, a=None): - self.x = x - self.y = y - self.a = a - - def __str__(self) -> str: - if self.a is None: - return f"({self.x}, {self.y})" - return f"({self.x}, {self.y}, a = {self.a})" - - def __repr__(self) -> str: - return self.__str__() - - def __eq__(self, other: Point) -> bool: - return other is not None and self.x == other.x and self.y == other.y and self.a == other.a - - def as_dict(self) -> Dict[str, Any]: - if self.a is None: - return { - ATTR_X: self.x, - ATTR_Y: self.y - } - return { - ATTR_X: self.x, - ATTR_Y: self.y, - ATTR_A: self.a - } - - def to_img(self, image_dimensions) -> Point: - return image_dimensions.to_img(self) - - def rotated(self, image_dimensions) -> Point: - alpha = image_dimensions.rotation - w = int(image_dimensions.width * image_dimensions.scale) - h = int(image_dimensions.height * image_dimensions.scale) - x = self.x - y = self.y - while alpha > 0: - tmp = y - y = w - x - x = tmp - tmp = h - h = w - w = tmp - alpha = alpha - 90 - return Point(x, y) - - def __mul__(self, other) -> Point: - return Point(self.x * other, self.y * other, self.a) - - def __truediv__(self, other) -> Point: - return Point(self.x / other, self.y / other, self.a) - - -class Obstacle(Point): - def __init__(self, x: float, y: float, details: Dict[str, Any]): - super().__init__(x, y) - self.details = details - - def as_dict(self) -> Dict[str, Any]: - return {**super(Obstacle, self).as_dict(), **self.details} - - def __str__(self) -> str: - return f"({self.x}, {self.y}, details = {self.details})" - - -class ImageDimensions: - def __init__(self, top: int, left: int, height: int, width: int, scale: float, rotation: int, - img_transformation: Callable[[Point], Point]): - self.top = top - self.left = left - self.height = height - self.width = width - self.scale = scale - self.rotation = rotation - self.img_transformation = img_transformation - - def to_img(self, point: Point) -> Point: - p = self.img_transformation(point) - return Point((p.x - self.left) * self.scale, (self.height - (p.y - self.top) - 1) * self.scale) - - -class ImageData: - def __init__(self, size: int, top: int, left: int, height: int, width: int, image_config: ImageConfig, - data: ImageType, img_transformation: Callable[[Point], Point], additional_layers: dict = None): - trim_left = int(image_config[CONF_TRIM][CONF_LEFT] * width / 100) - trim_right = int(image_config[CONF_TRIM][CONF_RIGHT] * width / 100) - trim_top = int(image_config[CONF_TRIM][CONF_TOP] * height / 100) - trim_bottom = int(image_config[CONF_TRIM][CONF_BOTTOM] * height / 100) - scale = image_config[CONF_SCALE] - rotation = image_config[CONF_ROTATE] - self.size = size - self.dimensions = ImageDimensions(top + trim_bottom, - left + trim_left, - height - trim_top - trim_bottom, - width - trim_left - trim_right, - scale, - rotation, img_transformation) - self.is_empty = height == 0 or width == 0 - self.data = data - if additional_layers is None: - self.additional_layers = {} - else: - self.additional_layers = dict(filter(lambda l: l[1] is not None, additional_layers.items())) - - def as_dict(self) -> Dict[str, Any]: - return { - ATTR_SIZE: self.size, - ATTR_OFFSET_Y: self.dimensions.top, - ATTR_OFFSET_X: self.dimensions.left, - ATTR_HEIGHT: self.dimensions.height, - ATTR_SCALE: self.dimensions.scale, - ATTR_ROTATION: self.dimensions.rotation, - ATTR_WIDTH: self.dimensions.width - } - - @staticmethod - def create_empty(data: ImageType) -> ImageData: - image_config = { - CONF_TRIM: { - CONF_LEFT: 0, - CONF_RIGHT: 0, - CONF_TOP: 0, - CONF_BOTTOM: 0 - }, - CONF_SCALE: 1, - CONF_ROTATE: 0 - } - return ImageData(0, 0, 0, 0, 0, image_config, data, lambda p: p) - - -class Path: - def __init__(self, point_length: Optional[int], point_size: Optional[int], angle: Optional[int], - path: List[List[Point]]): - self.point_length = point_length - self.point_size = point_size - self.angle = angle - self.path = path - - def as_dict(self) -> Dict[str, Any]: - return { - ATTR_POINT_LENGTH: self.point_length, - ATTR_POINT_SIZE: self.point_size, - ATTR_ANGLE: self.angle, - ATTR_PATH: self.path - } - - -class Zone: - def __init__(self, x0: float, y0: float, x1: float, y1: float): - self.x0 = x0 - self.y0 = y0 - self.x1 = x1 - self.y1 = y1 - - def __str__(self) -> str: - return f"[{self.x0}, {self.y0}, {self.x1}, {self.y1}]" - - def __repr__(self) -> str: - return self.__str__() - - def as_dict(self) -> Dict[str, Any]: - return { - ATTR_X0: self.x0, - ATTR_Y0: self.y0, - ATTR_X1: self.x1, - ATTR_Y1: self.y1 - } - - def as_area(self) -> Area: - return Area(self.x0, self.y0, self.x0, self.y1, self.x1, self.y1, self.x1, self.y0) - - -class Room(Zone): - def __init__(self, number: int, x0: Optional[float], y0: Optional[float], x1: Optional[float], y1: Optional[float], - name: str = None, pos_x: float = None, pos_y: float = None): - super().__init__(x0, y0, x1, y1) - self.number = number - self.name = name - self.pos_x = pos_x - self.pos_y = pos_y - - def as_dict(self) -> Dict[str, Any]: - super_dict = {**super(Room, self).as_dict()} - if self.name is not None: - super_dict[ATTR_NAME] = self.name - if self.pos_x is not None: - super_dict[ATTR_X] = self.pos_x - if self.pos_y is not None: - super_dict[ATTR_Y] = self.pos_y - return super_dict - - def __str__(self) -> str: - return f"[number: {self.number}, name: {self.name}, {self.x0}, {self.y0}, {self.x1}, {self.y1}]" - - def __repr__(self) -> str: - return self.__str__() - - def point(self) -> Optional[Point]: - if self.pos_x is not None and self.pos_y is not None and self.name is not None: - return Point(self.pos_x, self.pos_y) - return None - - -class Wall: - def __init__(self, x0: float, y0: float, x1: float, y1: float): - self.x0 = x0 - self.y0 = y0 - self.x1 = x1 - self.y1 = y1 - - def __str__(self) -> str: - return f"[{self.x0}, {self.y0}, {self.x1}, {self.y1}]" - - def __repr__(self) -> str: - return self.__str__() - - def as_dict(self) -> Dict[str, Any]: - return { - ATTR_X0: self.x0, - ATTR_Y0: self.y0, - ATTR_X1: self.x1, - ATTR_Y1: self.y1 - } - - def to_img(self, image_dimensions) -> Wall: - p0 = Point(self.x0, self.y0).to_img(image_dimensions) - p1 = Point(self.x1, self.y1).to_img(image_dimensions) - return Wall(p0.x, p0.y, p1.x, p1.y) - - def as_list(self) -> List[float]: - return [self.x0, self.y0, self.x1, self.y1] - - -class Area: - def __init__(self, x0: float, y0: float, x1: float, y1: float, x2: float, y2: float, x3: float, y3: float): - self.x0 = x0 - self.y0 = y0 - self.x1 = x1 - self.y1 = y1 - self.x2 = x2 - self.y2 = y2 - self.x3 = x3 - self.y3 = y3 - - def __str__(self) -> str: - return f"[{self.x0}, {self.y0}, {self.x1}, {self.y1}, {self.x2}, {self.y2}, {self.x3}, {self.y3}]" - - def __repr__(self) -> str: - return self.__str__() - - def as_dict(self) -> Dict[str, Any]: - return { - ATTR_X0: self.x0, - ATTR_Y0: self.y0, - ATTR_X1: self.x1, - ATTR_Y1: self.y1, - ATTR_X2: self.x2, - ATTR_Y2: self.y2, - ATTR_X3: self.x3, - ATTR_Y3: self.y3 - } - - def as_list(self) -> List[float]: - return [self.x0, self.y0, self.x1, self.y1, self.x2, self.y2, self.x3, self.y3] - - def to_img(self, image_dimensions) -> Area: - p0 = Point(self.x0, self.y0).to_img(image_dimensions) - p1 = Point(self.x1, self.y1).to_img(image_dimensions) - p2 = Point(self.x2, self.y2).to_img(image_dimensions) - p3 = Point(self.x3, self.y3).to_img(image_dimensions) - return Area(p0.x, p0.y, p1.x, p1.y, p2.x, p2.y, p3.x, p3.y) - - -class MapData: - def __init__(self, calibration_center: float = 0, calibration_diff: float = 0): - self._calibration_center = calibration_center - self._calibration_diff = calibration_diff - self.blocks = None - self.charger: Optional[Point] = None - self.goto: Optional[List[Point]] = None - self.goto_path: Optional[Path] = None - self.image: Optional[ImageData] = None - self.no_go_areas: Optional[List[Area]] = None - self.no_mopping_areas: Optional[List[Area]] = None - self.no_carpet_areas: Optional[List[Area]] = None - self.carpet_map: Optional[Set[int]] = [] - self.obstacles: Optional[List[Obstacle]] = None - self.ignored_obstacles: Optional[List[Obstacle]] = None - self.obstacles_with_photo: Optional[List[Obstacle]] = None - self.ignored_obstacles_with_photo: Optional[List[Obstacle]] = None - self.path: Optional[Path] = None - self.predicted_path: Optional[Path] = None - self.mop_path: Optional[Path] = None - self.rooms: Optional[Dict[int, Room]] = None - self.vacuum_position: Optional[Point] = None - self.vacuum_room: Optional[int] = None - self.vacuum_room_name: Optional[str] = None - self.walls: Optional[List[Wall]] = None - self.zones: Optional[List[Zone]] = None - self.cleaned_rooms: Optional[Set[int]] = None - self.map_name: Optional[str] = None - - def calibration(self) -> Optional[CalibrationPoints]: - if self.image.is_empty: - return None - calibration_points = [] - for point in [Point(self._calibration_center, self._calibration_center), - Point(self._calibration_center + self._calibration_diff, self._calibration_center), - Point(self._calibration_center, self._calibration_center + self._calibration_diff)]: - img_point = point.to_img(self.image.dimensions).rotated(self.image.dimensions) - calibration_points.append({ - "vacuum": {"x": point.x, "y": point.y}, - "map": {"x": int(img_point.x), "y": int(img_point.y)} - }) - return calibration_points diff --git a/custom_components/xiaomi_cloud_map_extractor/common/map_data_parser.py b/custom_components/xiaomi_cloud_map_extractor/common/map_data_parser.py deleted file mode 100644 index 62ed23c..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/common/map_data_parser.py +++ /dev/null @@ -1,64 +0,0 @@ -import logging - -from custom_components.xiaomi_cloud_map_extractor.common.image_handler import ImageHandler -from custom_components.xiaomi_cloud_map_extractor.common.map_data import ImageData, MapData -from custom_components.xiaomi_cloud_map_extractor.const import * -from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts - -_LOGGER = logging.getLogger(__name__) - - -class MapDataParser: - - @staticmethod - def create_empty(colors: Colors, text: str) -> MapData: - map_data = MapData() - empty_map = ImageHandler.create_empty_map_image(colors, text) - map_data.image = ImageData.create_empty(empty_map) - return map_data - - @staticmethod - def parse(raw: bytes, colors: Colors, drawables: Drawables, texts: Texts, sizes: Sizes, - image_config: ImageConfig, *args, **kwargs) -> MapData: - pass - - @staticmethod - def draw_elements(colors: Colors, drawables: Drawables, sizes: Sizes, map_data: MapData, image_config: ImageConfig): - scale = float(image_config[CONF_SCALE]) - - for drawable in drawables: - if DRAWABLE_CHARGER == drawable and map_data.charger is not None: - ImageHandler.draw_charger(map_data.image, map_data.charger, sizes, colors) - if DRAWABLE_VACUUM_POSITION == drawable and map_data.vacuum_position is not None: - ImageHandler.draw_vacuum_position(map_data.image, map_data.vacuum_position, sizes, colors) - if DRAWABLE_OBSTACLES == drawable and map_data.obstacles is not None: - ImageHandler.draw_obstacles(map_data.image, map_data.obstacles, sizes, colors) - if DRAWABLE_IGNORED_OBSTACLES == drawable and map_data.ignored_obstacles is not None: - ImageHandler.draw_ignored_obstacles(map_data.image, map_data.ignored_obstacles, sizes, colors) - if DRAWABLE_OBSTACLES_WITH_PHOTO == drawable and map_data.obstacles_with_photo is not None: - ImageHandler.draw_obstacles_with_photo(map_data.image, map_data.obstacles_with_photo, sizes, colors) - if DRAWABLE_IGNORED_OBSTACLES_WITH_PHOTO == drawable and map_data.ignored_obstacles_with_photo is not None: - ImageHandler.draw_ignored_obstacles_with_photo(map_data.image, map_data.ignored_obstacles_with_photo, - sizes, colors) - if DRAWABLE_MOP_PATH == drawable and map_data.mop_path is not None: - ImageHandler.draw_mop_path(map_data.image, map_data.mop_path, sizes, colors, scale) - if DRAWABLE_PATH == drawable and map_data.path is not None: - ImageHandler.draw_path(map_data.image, map_data.path, sizes, colors, scale) - if DRAWABLE_GOTO_PATH == drawable and map_data.goto_path is not None: - ImageHandler.draw_goto_path(map_data.image, map_data.goto_path, sizes, colors, scale) - if DRAWABLE_PREDICTED_PATH == drawable and map_data.predicted_path is not None: - ImageHandler.draw_predicted_path(map_data.image, map_data.predicted_path, sizes, colors, scale) - if DRAWABLE_NO_CARPET_AREAS == drawable and map_data.no_carpet_areas is not None: - ImageHandler.draw_no_carpet_areas(map_data.image, map_data.no_carpet_areas, colors) - if DRAWABLE_NO_GO_AREAS == drawable and map_data.no_go_areas is not None: - ImageHandler.draw_no_go_areas(map_data.image, map_data.no_go_areas, colors) - if DRAWABLE_NO_MOPPING_AREAS == drawable and map_data.no_mopping_areas is not None: - ImageHandler.draw_no_mopping_areas(map_data.image, map_data.no_mopping_areas, colors) - if DRAWABLE_VIRTUAL_WALLS == drawable and map_data.walls is not None: - ImageHandler.draw_walls(map_data.image, map_data.walls, colors) - if DRAWABLE_ZONES == drawable and map_data.zones is not None: - ImageHandler.draw_zones(map_data.image, map_data.zones, colors) - if DRAWABLE_CLEANED_AREA == drawable and DRAWABLE_CLEANED_AREA in map_data.image.additional_layers: - ImageHandler.draw_layer(map_data.image, drawable) - if DRAWABLE_ROOM_NAMES == drawable and map_data.rooms is not None: - ImageHandler.draw_room_names(map_data.image, map_data.rooms, colors) diff --git a/custom_components/xiaomi_cloud_map_extractor/common/vacuum.py b/custom_components/xiaomi_cloud_map_extractor/common/vacuum.py deleted file mode 100644 index 6b46841..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/common/vacuum.py +++ /dev/null @@ -1,66 +0,0 @@ -from abc import abstractmethod -from typing import Optional, Tuple - -from custom_components.xiaomi_cloud_map_extractor.common.map_data import MapData -from custom_components.xiaomi_cloud_map_extractor.common.map_data_parser import MapDataParser -from custom_components.xiaomi_cloud_map_extractor.common.xiaomi_cloud_connector import XiaomiCloudConnector -from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts - - -class XiaomiCloudVacuum: - - def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str): - self._connector = connector - self._country = country - self._user_id = user_id - self._device_id = device_id - self.model = model - - def get_map(self, - map_name: str, - colors: Colors, - drawables: Drawables, - texts: Texts, - sizes: Sizes, - image_config: ImageConfig, - store_map_path: Optional[str] = None) -> Tuple[Optional[MapData], bool]: - response = self.get_raw_map_data(map_name) - if response is None: - return None, False - map_stored = False - if store_map_path is not None: - raw_map_file = open(f"{store_map_path}/map_data_{self.model}.{self.get_map_archive_extension()}", "wb") - raw_map_file.write(response) - raw_map_file.close() - map_stored = True - map_data = self.decode_map(response, colors, drawables, texts, sizes, image_config) - if map_data is None: - return None, map_stored - map_data.map_name = map_name - return map_data, map_stored - - def get_raw_map_data(self, map_name: Optional[str]) -> Optional[bytes]: - if map_name is None: - return None - map_url = self.get_map_url(map_name) - return self._connector.get_raw_map_data(map_url) - - def decode_map(self, - raw_map: bytes, - colors: Colors, - drawables: Drawables, - texts: Texts, - sizes: Sizes, - image_config: ImageConfig) -> Optional[MapData]: - return MapDataParser.create_empty(colors, f"Vacuum\n{self.model}\nis not supported") - - @abstractmethod - def get_map_url(self, map_name: str) -> Optional[str]: - pass - - @abstractmethod - def should_get_map_from_vacuum(self) -> bool: - pass - - def get_map_archive_extension(self) -> str: - pass diff --git a/custom_components/xiaomi_cloud_map_extractor/const.py b/custom_components/xiaomi_cloud_map_extractor/const.py index ff49715..2af650e 100644 --- a/custom_components/xiaomi_cloud_map_extractor/const.py +++ b/custom_components/xiaomi_cloud_map_extractor/const.py @@ -55,7 +55,6 @@ DEFAULT_NAME = "Xiaomi Cloud Map Extractor" ATTRIBUTE_CALIBRATION = "calibration_points" -ATTRIBUTE_CARPET_MAP = "carpet_map" ATTRIBUTE_CHARGER = "charger" ATTRIBUTE_CLEANED_ROOMS = "cleaned_rooms" ATTRIBUTE_COUNTRY = "country" @@ -83,7 +82,7 @@ ATTRIBUTE_WALLS = "walls" ATTRIBUTE_ZONES = "zones" -CONF_AVAILABLE_ATTRIBUTES = [ATTRIBUTE_CALIBRATION, ATTRIBUTE_CARPET_MAP, ATTRIBUTE_NO_CARPET_AREAS, +CONF_AVAILABLE_ATTRIBUTES = [ATTRIBUTE_CALIBRATION, ATTRIBUTE_NO_CARPET_AREAS, ATTRIBUTE_CHARGER, ATTRIBUTE_CLEANED_ROOMS, ATTRIBUTE_COUNTRY, ATTRIBUTE_GOTO, ATTRIBUTE_GOTO_PATH, ATTRIBUTE_GOTO_PREDICTED_PATH, ATTRIBUTE_IGNORED_OBSTACLES, ATTRIBUTE_IGNORED_OBSTACLES_WITH_PHOTO, ATTRIBUTE_IMAGE, diff --git a/custom_components/xiaomi_cloud_map_extractor/dreame/__init__.py b/custom_components/xiaomi_cloud_map_extractor/dreame/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/custom_components/xiaomi_cloud_map_extractor/dreame/image_handler.py b/custom_components/xiaomi_cloud_map_extractor/dreame/image_handler.py deleted file mode 100644 index 4f79975..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/dreame/image_handler.py +++ /dev/null @@ -1,86 +0,0 @@ -import logging -from enum import IntEnum -from typing import Dict, Tuple - -from PIL import Image -from PIL.Image import Image as ImageType - -from custom_components.xiaomi_cloud_map_extractor.common.image_handler import ImageHandler -from custom_components.xiaomi_cloud_map_extractor.common.map_data import Room -from custom_components.xiaomi_cloud_map_extractor.const import \ - CONF_SCALE, CONF_TRIM, CONF_LEFT, CONF_RIGHT, CONF_TOP, CONF_BOTTOM, \ - COLOR_MAP_OUTSIDE, COLOR_MAP_INSIDE, COLOR_MAP_WALL, COLOR_ROOM_PREFIX - -_LOGGER = logging.getLogger(__name__) - - -class ImageHandlerDreame(ImageHandler): - class PixelTypes(IntEnum): - NONE = 0 - FLOOR = 1 - WALL = 2 - - @staticmethod - def parse(raw_data: bytes, header, colors, image_config, map_data_type: str) -> Tuple[ImageType, Dict[int, Room]]: - scale = image_config[CONF_SCALE] - trim_left = int(image_config[CONF_TRIM][CONF_LEFT] * header.image_width / 100) - trim_right = int(image_config[CONF_TRIM][CONF_RIGHT] * header.image_width / 100) - trim_top = int(image_config[CONF_TRIM][CONF_TOP] * header.image_height / 100) - trim_bottom = int(image_config[CONF_TRIM][CONF_BOTTOM] * header.image_height / 100) - trimmed_height = header.image_height - trim_top - trim_bottom - trimmed_width = header.image_width - trim_left - trim_right - image = Image.new('RGBA', (trimmed_width, trimmed_height)) - if header.image_width == 0 or header.image_height == 0: - return ImageHandler.create_empty_map_image(colors), {} - pixels = image.load() - rooms = {} - - for img_y in range(trimmed_height): - for img_x in range(trimmed_width): - x = img_x - y = trimmed_height - img_y - 1 - room_x = img_x + trim_left - room_y = img_y + trim_bottom - - # TODO : use MapDataParserDreame.MapDataTypes enum - if map_data_type == "regular": - px = raw_data[img_x + trim_left + header.image_width * (img_y + trim_bottom)] - segment_id = px >> 2 - if 0 < segment_id < 62: - if segment_id not in rooms: - rooms[segment_id] = Room(segment_id, room_x, room_y, room_x, room_y) - rooms[segment_id] = Room(segment_id, - min(rooms[segment_id].x0, room_x), min(rooms[segment_id].y0, room_y), - max(rooms[segment_id].x1, room_x), max(rooms[segment_id].y1, room_y)) - default = ImageHandler.ROOM_COLORS[segment_id >> 1] - pixels[x, y] = ImageHandler.__get_color__(f"{COLOR_ROOM_PREFIX}{segment_id}", colors, default) - else: - masked_px = px & 0b00000011 - - if masked_px == ImageHandlerDreame.PixelTypes.NONE: - pixels[x, y] = ImageHandler.__get_color__(COLOR_MAP_OUTSIDE, colors) - elif masked_px == ImageHandlerDreame.PixelTypes.FLOOR: - pixels[x, y] = ImageHandler.__get_color__(COLOR_MAP_INSIDE, colors) - elif masked_px == ImageHandlerDreame.PixelTypes.WALL: - pixels[x, y] = ImageHandler.__get_color__(COLOR_MAP_WALL, colors) - else: - _LOGGER.warning(f'unhandled pixel type: {px}') - elif map_data_type == "rism": - px = raw_data[img_x + trim_left + header.image_width * (img_y + trim_bottom)] - segment_id = px & 0b01111111 - wall_flag = px >> 7 - - if wall_flag: - pixels[x, y] = ImageHandler.__get_color__(COLOR_MAP_WALL, colors) - elif segment_id > 0: - if segment_id not in rooms: - rooms[segment_id] = Room(segment_id, room_x, room_y, room_x, room_y) - rooms[segment_id] = Room(segment_id, - min(rooms[segment_id].x0, room_x), min(rooms[segment_id].y0, room_y), - max(rooms[segment_id].x1, room_x), max(rooms[segment_id].y1, room_y)) - default = ImageHandler.ROOM_COLORS[segment_id >> 1] - pixels[x, y] = ImageHandler.__get_color__(f"{COLOR_ROOM_PREFIX}{segment_id}", colors, default) - - if image_config["scale"] != 1 and header.image_width != 0 and header.image_height != 0: - image = image.resize((int(trimmed_width * scale), int(trimmed_height * scale)), resample=Image.NEAREST) - return image, rooms diff --git a/custom_components/xiaomi_cloud_map_extractor/dreame/map_data_parser.py b/custom_components/xiaomi_cloud_map_extractor/dreame/map_data_parser.py deleted file mode 100644 index 6a960cd..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/dreame/map_data_parser.py +++ /dev/null @@ -1,245 +0,0 @@ -import base64 -import json -import logging -import re -import zlib -from enum import Enum, IntEnum -from typing import Dict, List, Optional, Tuple - -from custom_components.xiaomi_cloud_map_extractor.common.map_data import Area, ImageData, MapData, Path, Point, Room, \ - Wall -from custom_components.xiaomi_cloud_map_extractor.common.map_data_parser import MapDataParser -from custom_components.xiaomi_cloud_map_extractor.dreame.image_handler import ImageHandlerDreame - -_LOGGER = logging.getLogger(__name__) - - -class MapDataHeader: - def __init__(self): - self.map_index: Optional[int] = None - self.frame_type: Optional[int] = None - self.vacuum_position: Optional[Point] = None - self.charger_position: Optional[Point] = None - self.image_pixel_size: Optional[int] = None - self.image_width: Optional[int] = None - self.image_height: Optional[int] = None - self.image_left: Optional[int] = None - self.image_top: Optional[int] = None - - -class MapDataParserDreame(MapDataParser): - HEADER_SIZE = 27 - PATH_REGEX = r'(?P[SL])(?P-?\d+),(?P-?\d+)' - - class PathOperators(str, Enum): - START = "S" - RELATIVE_LINE = "L" - - class FrameTypes(IntEnum): - I_FRAME = 73 - P_FRAME = 80 - - class MapDataTypes(str, Enum): - REGULAR = "regular" - RISM = "rism" # Room - information - - @staticmethod - def decode_map(raw_map: str, colors, drawables, texts, sizes, image_config, - map_data_type=MapDataTypes.REGULAR) -> MapData: - _LOGGER.debug(f'decoding {map_data_type} type map') - raw_map_string = raw_map.replace('_', '/').replace('-', '+') - unzipped = zlib.decompress(base64.decodebytes(raw_map_string.encode("utf8"))) - return MapDataParserDreame.parse(unzipped, colors, drawables, texts, sizes, image_config, map_data_type) - - @staticmethod - def parse(raw: bytes, colors, drawables, texts, sizes, image_config, - map_data_type: MapDataTypes = MapDataTypes.REGULAR, *args, **kwargs) -> Optional[MapData]: - map_data = MapData(0, 1000) - - header = MapDataParserDreame.parse_header(raw) - - if header.frame_type != MapDataParserDreame.FrameTypes.I_FRAME: - _LOGGER.error("unsupported map frame type") - return - - if len(raw) >= MapDataParserDreame.HEADER_SIZE + header.image_width * header.image_height: - image_raw = raw[MapDataParserDreame.HEADER_SIZE: - MapDataParserDreame.HEADER_SIZE + header.image_width * header.image_height] - additional_data_raw = raw[MapDataParserDreame.HEADER_SIZE + header.image_width * header.image_height:] - additional_data_json = json.loads(additional_data_raw.decode("utf8")) - _LOGGER.debug(f'map additional_data: {additional_data_json}') - - map_data.charger = header.charger_position - map_data.vacuum_position = header.vacuum_position - - map_data.image, map_data.rooms = MapDataParserDreame.parse_image(image_raw, header, colors, image_config, - additional_data_json, map_data_type) - - if additional_data_json.get("rism") and \ - additional_data_json.get("ris") and additional_data_json["ris"] == 2: - rism_map_data = MapDataParserDreame.decode_map( - additional_data_json["rism"], - colors, - drawables, - texts, - sizes, - image_config, - MapDataParserDreame.MapDataTypes.RISM - ) - map_data.no_go_areas = rism_map_data.no_go_areas - map_data.no_mopping_areas = rism_map_data.no_mopping_areas - map_data.walls = rism_map_data.walls - map_data.rooms = rism_map_data.rooms - _LOGGER.debug(f"rooms: {map_data.rooms}") - - if not rism_map_data.image.is_empty: - map_data.image = rism_map_data.image - - if additional_data_json.get("tr"): - map_data.path = MapDataParserDreame.parse_path(additional_data_json["tr"]) - - if additional_data_json.get("vw"): - if additional_data_json["vw"].get("rect"): - map_data.no_go_areas = MapDataParserDreame.parse_areas(additional_data_json["vw"]["rect"]) - if additional_data_json["vw"].get("mop"): - map_data.no_mopping_areas = MapDataParserDreame.parse_areas(additional_data_json["vw"]["mop"]) - if additional_data_json["vw"].get("line"): - map_data.walls = MapDataParserDreame.parse_virtual_walls(additional_data_json["vw"]["line"]) - - if additional_data_json.get("sa") and isinstance(additional_data_json["sa"], list): - active_segment_ids = [sa[0] for sa in additional_data_json["sa"]] - - if not map_data.image.is_empty: - if map_data_type == MapDataParserDreame.MapDataTypes.REGULAR: - MapDataParserDreame.draw_elements(colors, drawables, sizes, map_data, image_config) - ImageHandlerDreame.rotate(map_data.image) - - return map_data - - @staticmethod - def parse_header(raw: bytes) -> Optional[MapDataHeader]: - header = MapDataHeader() - - if not raw or len(raw) < MapDataParserDreame.HEADER_SIZE: - _LOGGER.error("wrong header size for map") - return - - header.map_index = MapDataParserDreame.read_int_16_le(raw) - header.frame_type = MapDataParserDreame.read_int_8(raw, 4) - header.vacuum_position = Point( - MapDataParserDreame.read_int_16_le(raw, 5), - MapDataParserDreame.read_int_16_le(raw, 7), - MapDataParserDreame.read_int_16_le(raw, 9) - ) - header.charger_position = Point( - MapDataParserDreame.read_int_16_le(raw, 11), - MapDataParserDreame.read_int_16_le(raw, 13), - MapDataParserDreame.read_int_16_le(raw, 15) - ) - header.image_pixel_size = MapDataParserDreame.read_int_16_le(raw, 17) - header.image_width = MapDataParserDreame.read_int_16_le(raw, 19) - header.image_height = MapDataParserDreame.read_int_16_le(raw, 21) - header.image_left = round(MapDataParserDreame.read_int_16_le(raw, 23) / header.image_pixel_size) - header.image_top = round(MapDataParserDreame.read_int_16_le(raw, 25) / header.image_pixel_size) - - _LOGGER.debug(f'decoded map header : {header.__dict__}') - - return header - - @staticmethod - def parse_image(image_raw: bytes, header: MapDataHeader, colors, image_config, - additional_data_json, map_data_type: MapDataTypes) -> Tuple[ImageData, Dict[int, Room]]: - - _LOGGER.debug(f"parse image for map {map_data_type}") - image, image_rooms = ImageHandlerDreame.parse(image_raw, header, colors, image_config, map_data_type) - - room_names = {} - if additional_data_json.get("seg_inf"): - room_names = {int(k): base64.b64decode(v.get("name")).decode('utf-8') for (k, v) in - additional_data_json["seg_inf"].items() if - v.get("name")} - - rooms = {k: Room( - k, - (v.x0 + header.image_left) * header.image_pixel_size, - (v.y0 + header.image_top) * header.image_pixel_size, - (v.x1 + header.image_left) * header.image_pixel_size, - (v.y1 + header.image_top) * header.image_pixel_size, - room_names[k] if room_names.get(k) else str(k) - ) for (k, v) in image_rooms.items()} - - return ImageData( - header.image_width * header.image_height, - header.image_top, - header.image_left, - header.image_height, - header.image_width, - image_config, - image, - lambda p: MapDataParserDreame.map_to_image(p, header.image_pixel_size) - ), rooms - - @staticmethod - def map_to_image(p: Point, image_pixel_size: int) -> Point: - return Point( - p.x / image_pixel_size, - p.y / image_pixel_size - ) - - @staticmethod - def parse_path(path_string: str) -> Path: - r = re.compile(MapDataParserDreame.PATH_REGEX) - matches = [m.groupdict() for m in r.finditer(path_string)] - - current_path = [] - path_points = [] - current_position = Point(0, 0) - for match in matches: - if match["operator"] == MapDataParserDreame.PathOperators.START: - current_path = [] - path_points.append(current_path) - current_position = Point(int(match["x"]), int(match["y"])) - elif match["operator"] == MapDataParserDreame.PathOperators.RELATIVE_LINE: - current_position = Point(current_position.x + int(match["x"]), current_position.y + int(match["y"])) - else: - _LOGGER.error(f'invalid path operator {match["operator"]}') - current_path.append(current_position) - - return Path(None, None, None, path_points) - - @staticmethod - def parse_areas(areas: list) -> List[Area]: - parsed_areas = [] - for area in areas: - x_coords = sorted([area[0], area[2]]) - y_coords = sorted([area[1], area[3]]) - parsed_areas.append( - Area( - x_coords[0], y_coords[0], - x_coords[1], y_coords[0], - x_coords[1], y_coords[1], - x_coords[0], y_coords[1] - ) - ) - return parsed_areas - - @staticmethod - def parse_virtual_walls(virtual_walls: list) -> List[Wall]: - return [Wall(virtual_wall[0], virtual_wall[1], virtual_wall[2], virtual_wall[3]) - for virtual_wall in virtual_walls] - - @staticmethod - def read_int_8(data: bytes, offset: int = 0): - return int.from_bytes(data[offset:offset + 1], byteorder='big', signed=True) - - @staticmethod - def read_int_8_le(data: bytes, offset: int = 0): - return int.from_bytes(data[offset:offset + 1], byteorder='little', signed=True) - - @staticmethod - def read_int_16(data: bytes, offset: int = 0): - return int.from_bytes(data[offset:offset + 2], byteorder='big', signed=True) - - @staticmethod - def read_int_16_le(data: bytes, offset: int = 0): - return int.from_bytes(data[offset:offset + 2], byteorder='little', signed=True) diff --git a/custom_components/xiaomi_cloud_map_extractor/dreame/vacuum.py b/custom_components/xiaomi_cloud_map_extractor/dreame/vacuum.py deleted file mode 100644 index 35d9ad7..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/dreame/vacuum.py +++ /dev/null @@ -1,24 +0,0 @@ -from custom_components.xiaomi_cloud_map_extractor.common.map_data import MapData -from custom_components.xiaomi_cloud_map_extractor.common.vacuum_v2 import XiaomiCloudVacuumV2 -from custom_components.xiaomi_cloud_map_extractor.common.xiaomi_cloud_connector import XiaomiCloudConnector -from custom_components.xiaomi_cloud_map_extractor.dreame.map_data_parser import MapDataParserDreame -from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts - - -class DreameVacuum(XiaomiCloudVacuumV2): - - def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str): - super().__init__(connector, country, user_id, device_id, model) - - def get_map_archive_extension(self) -> str: - return "b64" - - def decode_map(self, - raw_map: bytes, - colors: Colors, - drawables: Drawables, - texts: Texts, - sizes: Sizes, - image_config: ImageConfig) -> MapData: - raw_map_string = raw_map.decode() - return MapDataParserDreame.decode_map(raw_map_string, colors, drawables, texts, sizes, image_config) diff --git a/custom_components/xiaomi_cloud_map_extractor/manifest.json b/custom_components/xiaomi_cloud_map_extractor/manifest.json index 626c221..6fb9532 100644 --- a/custom_components/xiaomi_cloud_map_extractor/manifest.json +++ b/custom_components/xiaomi_cloud_map_extractor/manifest.json @@ -8,12 +8,17 @@ "@PiotrMachowski" ], "requirements": [ - "pillow", + "Pillow", "pybase64", "python-miio", "requests", - "pycryptodome" + "pycryptodome", + "vacuum-map-parser-base", + "vacuum-map-parser-roborock", + "vacuum-map-parser-viomi", + "vacuum-map-parser-roidmi", + "vacuum-map-parser-dreame" ], - "version": "v2.2.0", + "version": "v3.0.0-beta", "iot_class": "cloud_polling" } diff --git a/custom_components/xiaomi_cloud_map_extractor/roidmi/__init__.py b/custom_components/xiaomi_cloud_map_extractor/roidmi/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/custom_components/xiaomi_cloud_map_extractor/roidmi/image_handler.py b/custom_components/xiaomi_cloud_map_extractor/roidmi/image_handler.py deleted file mode 100644 index ff4d685..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/roidmi/image_handler.py +++ /dev/null @@ -1,67 +0,0 @@ -import logging -from typing import Dict, List, Tuple - -from PIL import Image -from PIL.Image import Image as ImageType - -from custom_components.xiaomi_cloud_map_extractor.common.image_handler import ImageHandler -from custom_components.xiaomi_cloud_map_extractor.const import * -from custom_components.xiaomi_cloud_map_extractor.types import Colors, ImageConfig - -_LOGGER = logging.getLogger(__name__) - - -class ImageHandlerRoidmi(ImageHandler): - MAP_WALL = 0 - MAP_OUTSIDE = 127 - MAP_UNKNOWN = 255 - - @staticmethod - def parse(raw_data: bytes, width: int, height: int, colors: Colors, image_config: ImageConfig, - room_numbers: List[int]) \ - -> Tuple[ImageType, Dict[int, Tuple[int, int, int, int]]]: - rooms = {} - scale = image_config[CONF_SCALE] - trim_left = int(image_config[CONF_TRIM][CONF_LEFT] * width / 100) - trim_right = int(image_config[CONF_TRIM][CONF_RIGHT] * width / 100) - trim_top = int(image_config[CONF_TRIM][CONF_TOP] * height / 100) - trim_bottom = int(image_config[CONF_TRIM][CONF_BOTTOM] * height / 100) - trimmed_height = height - trim_top - trim_bottom - trimmed_width = width - trim_left - trim_right - if trimmed_width == 0 or trimmed_height == 0: - return ImageHandler.create_empty_map_image(colors), rooms - image = Image.new('RGBA', (trimmed_width, trimmed_height)) - pixels = image.load() - unknown_pixels = set() - for img_y in range(trimmed_height): - for img_x in range(trimmed_width): - pixel_type = raw_data[img_x + trim_left + width * (img_y + trim_bottom)] - x = img_x - y = trimmed_height - 1 - img_y - if pixel_type == ImageHandlerRoidmi.MAP_OUTSIDE: - pixels[x, y] = ImageHandler.__get_color__(COLOR_MAP_OUTSIDE, colors) - elif pixel_type == ImageHandlerRoidmi.MAP_WALL: - pixels[x, y] = ImageHandler.__get_color__(COLOR_MAP_WALL_V2, colors) - elif pixel_type == ImageHandlerRoidmi.MAP_UNKNOWN: - pixels[x, y] = ImageHandler.__get_color__(COLOR_UNKNOWN, colors) - elif pixel_type in room_numbers: - room_x = img_x + trim_left - room_y = img_y + trim_bottom - room_number = pixel_type - if room_number not in rooms: - rooms[room_number] = (room_x, room_y, room_x, room_y) - else: - rooms[room_number] = (min(rooms[room_number][0], room_x), - min(rooms[room_number][1], room_y), - max(rooms[room_number][2], room_x), - max(rooms[room_number][3], room_y)) - default = ImageHandler.ROOM_COLORS[room_number % len(ImageHandler.ROOM_COLORS)] - pixels[x, y] = ImageHandler.__get_color__(f"{COLOR_ROOM_PREFIX}{room_number}", colors, default) - else: - pixels[x, y] = ImageHandler.__get_color__(COLOR_UNKNOWN, colors) - unknown_pixels.add(pixel_type) - if image_config["scale"] != 1 and trimmed_width != 0 and trimmed_height != 0: - image = image.resize((int(trimmed_width * scale), int(trimmed_height * scale)), resample=Image.NEAREST) - if len(unknown_pixels) > 0: - _LOGGER.warning('unknown pixel_types: %s', unknown_pixels) - return image, rooms diff --git a/custom_components/xiaomi_cloud_map_extractor/roidmi/map_data_parser.py b/custom_components/xiaomi_cloud_map_extractor/roidmi/map_data_parser.py deleted file mode 100644 index c50f077..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/roidmi/map_data_parser.py +++ /dev/null @@ -1,169 +0,0 @@ -import json -import logging -import math -from typing import Dict, List, Optional, Tuple - -from custom_components.xiaomi_cloud_map_extractor.common.map_data import Area, ImageData, MapData, Path, Point, Room, \ - Wall -from custom_components.xiaomi_cloud_map_extractor.common.map_data_parser import MapDataParser -from custom_components.xiaomi_cloud_map_extractor.const import * -from custom_components.xiaomi_cloud_map_extractor.roidmi.image_handler import ImageHandlerRoidmi - -_LOGGER = logging.getLogger(__name__) - - -class MapDataParserRoidmi(MapDataParser): - - @staticmethod - def parse(raw: bytes, colors, drawables, texts, sizes, image_config, *args, **kwargs) -> MapData: - scale = float(image_config[CONF_SCALE]) - map_image_size = raw.find(bytes([127, 123])) - map_image = raw[16:map_image_size + 1] - map_info_raw = raw[map_image_size + 1:] - map_info = json.loads(map_info_raw) - width = map_info["width"] - height = map_info["height"] - x_min = map_info["x_min"] - y_min = map_info["y_min"] - resolution = map_info["resolution"] - x_min_calc = x_min / resolution - y_min_calc = y_min / resolution - map_data = MapData(0, 1000) - map_data.rooms = MapDataParserRoidmi.parse_rooms(map_info) - image = MapDataParserRoidmi.parse_image(map_image, width, height, x_min_calc, y_min_calc, resolution, - colors, image_config, map_data.rooms) - map_data.image = image - map_data.path = MapDataParserRoidmi.parse_path(map_info) - map_data.vacuum_position = MapDataParserRoidmi.parse_vacuum_position(map_info) - map_data.charger = MapDataParserRoidmi.parse_charger_position(map_info) - map_data.no_go_areas, map_data.no_mopping_areas, map_data.walls = MapDataParserRoidmi.parse_areas(map_info) - if not map_data.image.is_empty: - MapDataParserRoidmi.draw_elements(colors, drawables, sizes, map_data, image_config) - if len(map_data.rooms) > 0 and map_data.vacuum_position is not None: - map_data.vacuum_room = MapDataParserRoidmi.get_current_vacuum_room(map_image, map_data, width) - if map_data.vacuum_room is not None: - map_data.vacuum_room_name = map_data.rooms[map_data.vacuum_room].name - ImageHandlerRoidmi.rotate(map_data.image) - ImageHandlerRoidmi.draw_texts(map_data.image, texts) - return map_data - - @staticmethod - def get_current_vacuum_room(map_image: bytes, map_data: MapData, original_width: int) -> Optional[int]: - p = map_data.image.dimensions.img_transformation(map_data.vacuum_position) - room_number = map_image[int(p.x) + int(p.y) * original_width] - if room_number in map_data.rooms: - return room_number - return None - - @staticmethod - def map_to_image(p: Point, resolution, min_x, min_y) -> Point: - return Point(p.x / 1000 / resolution - min_x, p.y / 1000 / resolution - min_y) - - @staticmethod - def image_to_map(p: Point, resolution, min_x, min_y) -> Point: - return Point((p.x + min_x) * resolution * 1000, (p.y + min_y) * resolution * 1000) - - @staticmethod - def parse_image(map_image: bytes, width: int, height: int, min_x: float, min_y: float, resolution: float, - colors: Dict, image_config: Dict, rooms: Dict[int, Room]) -> ImageData: - image_top = 0 - image_left = 0 - room_numbers = list(rooms.keys()) - image, rooms_raw = ImageHandlerRoidmi.parse(map_image, width, height, colors, image_config, room_numbers) - for number, room in rooms_raw.items(): - pf = lambda p: MapDataParserRoidmi.image_to_map(p, resolution, min_x, min_y) - p1 = pf(Point(room[0] + image_left, room[1] + image_top)) - p2 = pf(Point(room[2] + image_left, room[3] + image_top)) - rooms[number].x0 = p1.x - rooms[number].y0 = p1.y - rooms[number].x1 = p2.x - rooms[number].y1 = p2.y - return ImageData(width * height, image_top, image_left, height, width, image_config, image, - lambda p: MapDataParserRoidmi.map_to_image(p, resolution, min_x, min_y)) - - @staticmethod - def parse_path(map_info: dict) -> Path: - path_points = [] - if "posArray" in map_info: - raw_points = json.loads(map_info["posArray"]) - for raw_point in raw_points: - point = Point(raw_point[0], raw_point[1]) - path_points.append(point) - return Path(None, None, None, [path_points]) - - @staticmethod - def parse_vacuum_position(map_info: dict) -> Point: - vacuum_position = MapDataParserRoidmi.parse_position(map_info, "robotPos", "robotPos", "robotPhi") - if vacuum_position is None: - vacuum_position = MapDataParserRoidmi.parse_position(map_info, "posX", "posY", "posPhi") - return vacuum_position - - @staticmethod - def parse_charger_position(map_info: dict) -> Point: - return MapDataParserRoidmi.parse_position(map_info, "chargeHandlePos", "chargeHandlePos", "chargeHandlePhi") - - @staticmethod - def parse_position(map_info: dict, x_label: str, y_label: str, a_label: str) -> Optional[Point]: - position = None - if x_label not in map_info or y_label not in map_info: - return position - x = map_info[x_label] - y = map_info[y_label] - a = None - if x_label == y_label: - x = x[0] - y = y[1] - if a_label in map_info: - a = map_info[a_label] / 1000 * 180 / math.pi - position = Point(x, y, a) - return position - - @staticmethod - def parse_rooms(map_info: dict) -> Dict[int, Room]: - rooms = {} - areas = [] - if "autoArea" in map_info: - areas = map_info["autoArea"] - elif "autoAreaValue" in map_info and map_info["autoAreaValue"] is not None: - areas = map_info["autoAreaValue"] - for area in areas: - id = area["id"] - name = area["name"] - pos_x = area["pos"][0] if "pos" in area else None - pos_y = area["pos"][1] if "pos" in area else None - rooms[id] = Room(id, None, None, None, None, name, pos_x, pos_y) - return rooms - - @staticmethod - def parse_areas(map_info: dict) -> Tuple[List[Area], List[Area], List[Wall]]: - no_go_areas = [] - no_mopping_areas = [] - walls = [] - if "area" in map_info: - areas = map_info["area"] - for area in areas: - if "active" in area and area["active"] == "forbid" and "vertexs" in area and len(area["vertexs"]) == 4: - vertexs = area["vertexs"] - x0 = vertexs[0][0] - y0 = vertexs[0][1] - x1 = vertexs[1][0] - y1 = vertexs[1][1] - x2 = vertexs[2][0] - y2 = vertexs[2][1] - x3 = vertexs[3][0] - y3 = vertexs[3][1] - no_area = Area(x0, y0, x1, y1, x2, y2, x3, y3) - if "forbidType" in area and area["forbidType"] == "mop": - no_mopping_areas.append(no_area) - if "forbidType" in area and area["forbidType"] == "all": - no_go_areas.append(no_area) - if "active" in area and area["active"] == "forbid" and "vertexs" in area and len(area["vertexs"]) == 2: - vertexs = area["vertexs"] - x0 = vertexs[0][0] - y0 = vertexs[0][1] - x1 = vertexs[1][0] - y1 = vertexs[1][1] - wall = Wall(x0, y0, x1, y1) - if "forbidType" in area and area["forbidType"] == "all": - walls.append(wall) - return no_go_areas, no_mopping_areas, walls diff --git a/custom_components/xiaomi_cloud_map_extractor/roidmi/vacuum.py b/custom_components/xiaomi_cloud_map_extractor/roidmi/vacuum.py deleted file mode 100644 index ce9b24d..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/roidmi/vacuum.py +++ /dev/null @@ -1,26 +0,0 @@ -import gzip - -from custom_components.xiaomi_cloud_map_extractor.common.map_data import MapData -from custom_components.xiaomi_cloud_map_extractor.common.vacuum_v2 import XiaomiCloudVacuumV2 -from custom_components.xiaomi_cloud_map_extractor.common.xiaomi_cloud_connector import XiaomiCloudConnector -from custom_components.xiaomi_cloud_map_extractor.roidmi.map_data_parser import MapDataParserRoidmi -from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts - - -class RoidmiVacuum(XiaomiCloudVacuumV2): - - def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str): - super().__init__(connector, country, user_id, device_id, model) - - def decode_map(self, - raw_map: bytes, - colors: Colors, - drawables: Drawables, - texts: Texts, - sizes: Sizes, - image_config: ImageConfig) -> MapData: - unzipped = gzip.decompress(raw_map) - return MapDataParserRoidmi.parse(unzipped, colors, drawables, texts, sizes, image_config) - - def get_map_archive_extension(self) -> str: - return "gz" diff --git a/custom_components/xiaomi_cloud_map_extractor/types.py b/custom_components/xiaomi_cloud_map_extractor/types.py deleted file mode 100644 index 6c00ec2..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/types.py +++ /dev/null @@ -1,9 +0,0 @@ -from typing import Any, Dict, List, Tuple, Union - -Color = Union[Tuple[int, int, int], Tuple[int, int, int, int]] -Colors = Dict[str, Color] -Drawables = List[str] -Texts = List[Any] -Sizes = Dict[str, float] -ImageConfig = Dict[str, Any] -CalibrationPoints = List[Dict[str, Dict[str, Union[float, int]]]] diff --git a/custom_components/xiaomi_cloud_map_extractor/unsupported/__init__.py b/custom_components/xiaomi_cloud_map_extractor/unsupported/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/custom_components/xiaomi_cloud_map_extractor/unsupported/vacuum.py b/custom_components/xiaomi_cloud_map_extractor/unsupported/vacuum.py deleted file mode 100644 index 928a2ab..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/unsupported/vacuum.py +++ /dev/null @@ -1,10 +0,0 @@ -from custom_components.xiaomi_cloud_map_extractor.common.vacuum_v2 import XiaomiCloudVacuumV2 - - -class UnsupportedVacuum(XiaomiCloudVacuumV2): - - def __init__(self, connector, country, user_id, device_id, model): - super().__init__(connector, country, user_id, device_id, model) - - def get_map_archive_extension(self): - return "unknown" diff --git a/custom_components/xiaomi_cloud_map_extractor/common/__init__.py b/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/__init__.py similarity index 100% rename from custom_components/xiaomi_cloud_map_extractor/common/__init__.py rename to custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/__init__.py diff --git a/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_base.py b/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_base.py new file mode 100644 index 0000000..48bbfec --- /dev/null +++ b/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_base.py @@ -0,0 +1,99 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass + +from vacuum_map_parser_base.config.color import ColorsPalette +from vacuum_map_parser_base.config.drawable import Drawable +from vacuum_map_parser_base.config.image_config import ImageConfig +from vacuum_map_parser_base.config.size import Sizes +from vacuum_map_parser_base.config.text import Text +from vacuum_map_parser_base.map_data import MapData +from vacuum_map_parser_base.map_data_parser import MapDataParser + +from .xiaomi_cloud_connector import XiaomiCloudConnector + + +@dataclass +class VacuumConfig: + connector: XiaomiCloudConnector + country: str + user_id: str + device_id: str + host: str + token: str + model: str + palette: ColorsPalette + drawables: list[Drawable] + image_config: ImageConfig + sizes: Sizes + texts: list[Text] + store_map_path: str | None + + +class XiaomiCloudVacuum(ABC): + + def __init__(self, vacuum_config: VacuumConfig): + self.model = vacuum_config.model + self._connector = vacuum_config.connector + self._country = vacuum_config.country + self._user_id = vacuum_config.user_id + self._device_id = vacuum_config.device_id + self._palette = vacuum_config.palette + self._drawables = vacuum_config.drawables + self._image_config = vacuum_config.image_config + self._sizes = vacuum_config.sizes + self._texts = vacuum_config.texts + self._store_map_path = vacuum_config.store_map_path + + @property + @abstractmethod + def map_archive_extension(self) -> str: + pass + + @property + @abstractmethod + def should_get_map_from_vacuum(self) -> bool: + pass + + @property + def should_update_map(self) -> bool: + return True + + @property + @abstractmethod + def map_data_parser(self) -> MapDataParser: + pass + + def decode_and_parse(self, raw_map: bytes) -> MapData: + decoded_map = self.map_data_parser.unpack_map(raw_map) + return self.map_data_parser.parse(decoded_map) + + @abstractmethod + def get_map_url(self, map_name: str) -> str | None: + pass + + def get_map_name(self) -> str | None: + return "0" + + def get_map(self) -> tuple[MapData | None, bool]: + map_name = self.get_map_name() + raw_map_data = self.get_raw_map_data(map_name) + if raw_map_data is None: + return None, False + map_stored = False + if self._store_map_path is not None: + self.store_map(raw_map_data) + map_stored = True + map_data = self.decode_and_parse(raw_map_data) + if map_data is not None: + map_data.map_name = map_name + return map_data, map_stored + + def get_raw_map_data(self, map_name: str | None) -> bytes | None: + if map_name is None: + return None + map_url = self.get_map_url(map_name) + return self._connector.get_raw_map_data(map_url) + + def store_map(self, raw_map_data): + with open(f"{self._store_map_path}/map_data_{self.model}.{self.map_archive_extension}", "wb") as raw_map_file: + raw_map_file.write(raw_map_data) diff --git a/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_dreame.py b/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_dreame.py new file mode 100644 index 0000000..31a3e64 --- /dev/null +++ b/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_dreame.py @@ -0,0 +1,72 @@ +from vacuum_map_parser_base.map_data import MapData +from vacuum_map_parser_dreame.map_data_parser import DreameMapDataParser + +from .vacuum_base import VacuumConfig +from .vacuum_v2 import XiaomiCloudVacuumV2 + + +class DreameCloudVacuum(XiaomiCloudVacuumV2): + + def __init__(self, vacuum_config: VacuumConfig): + super().__init__(vacuum_config) + self._dreame_map_data_parser = DreameMapDataParser( + vacuum_config.palette, + vacuum_config.sizes, + vacuum_config.drawables, + vacuum_config.image_config, + vacuum_config.texts, + vacuum_config.model, + ) + self._robot_stamp = 0 + self._enc_key = None + + @property + def map_archive_extension(self) -> str: + if self.model in DreameMapDataParser.IVs: + return "enc.b64" + return "b64" + + @property + def map_data_parser(self) -> DreameMapDataParser: + return self._dreame_map_data_parser + + def decode_and_parse(self, raw_map: bytes) -> MapData: + decoded_map = self.map_data_parser.unpack_map(raw_map, enckey=self._enc_key) + return self.map_data_parser.parse(decoded_map) + + def get_map_name(self) -> str | None: + if self.model in DreameMapDataParser.IVs: + if self._robot_stamp != 0: + parameters = [{'piid': 2, 'value': '{"req_type":1,"frame_type":"I","time":%d}' % self._robot_stamp}] + else: + parameters = [{'piid': 2, 'value': '{"req_type":1,"frame_type":"I"}'}] + + response = self._connector.get_other_info(self._device_id, "action", parameters={ + "did": self._device_id, + "siid": 6, + "aiid": 1, + "in": parameters, + }) + + if response is None: + return None + + _key = response["result"]["out"][1]["value"] + + if len(_key) == 0: + robotstamp = response["result"]["out"][2]["value"] + self._robot_stamp = robotstamp + return None + + _map_name = _key.split(",") + map_name = _map_name[0].split("/")[2] + self._enc_key = _map_name[1] + self._robot_stamp = 0 + return map_name + else: + return super().get_map_name() + + def store_map(self, raw_map_data): + super().store_map(raw_map_data) + with open(f"{self._store_map_path}/map_data_{self.model}.enc.key", "w") as enc_key_file: + enc_key_file.write(self._enc_key) diff --git a/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_roborock.py b/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_roborock.py new file mode 100644 index 0000000..937f11d --- /dev/null +++ b/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_roborock.py @@ -0,0 +1,79 @@ +import logging +import time + +from miio import RoborockVacuum, DeviceException +from vacuum_map_parser_roborock.map_data_parser import RoborockMapDataParser + +from .vacuum_base import VacuumConfig, XiaomiCloudVacuum + +_LOGGER = logging.getLogger(__name__) + + +class RoborockCloudVacuum(XiaomiCloudVacuum): + + def __init__(self, vacuum_config: VacuumConfig): + super().__init__(vacuum_config) + self._roborock_map_data_parser = RoborockMapDataParser( + vacuum_config.palette, + vacuum_config.sizes, + vacuum_config.drawables, + vacuum_config.image_config, + vacuum_config.texts + ) + self._vacuum = RoborockVacuum(vacuum_config.host, vacuum_config.token) + + def get_map_url(self, map_name: str) -> str | None: + url = self._connector.get_api_url(self._country) + "/home/getmapfileurl" + params = { + "data": '{"obj_name":"' + map_name + '"}' + } + api_response = self._connector.execute_api_call_encrypted(url, params) + if ( + api_response is None + or "result" not in api_response + or api_response["result"] is None + or "url" not in api_response["result"]): + return None + return api_response["result"]["url"] + + @property + def should_get_map_from_vacuum(self) -> bool: + return True + + @property + def should_update_map(self) -> bool: + return self._vacuum.status().state_code not in [ + 2, # Charger disconnected + 3, # Idle + 8, # Charging + 9, # Charging problem + 13, # Shutting down + 14, # Updating + 100, # Charging complete + 101, # Device offline + ] + + @property + def map_archive_extension(self) -> str: + return "gz" + + @property + def map_data_parser(self) -> RoborockMapDataParser: + return self._roborock_map_data_parser + + def get_map_name(self) -> str: + map_name = "retry" + counter = 10 + while map_name == "retry" and counter > 0: + _LOGGER.debug("Retrieving map name from device") + time.sleep(0.1) + try: + map_name = self._vacuum.map()[0] + _LOGGER.debug("Map name %s", map_name) + except OSError as exc: + _LOGGER.error("Got OSError while fetching the state: %s", exc) + except DeviceException as exc: + _LOGGER.warning("Got exception while fetching the state: %s", exc) + finally: + counter = counter - 1 + return map_name diff --git a/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_roidmi.py b/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_roidmi.py new file mode 100644 index 0000000..8e45bd7 --- /dev/null +++ b/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_roidmi.py @@ -0,0 +1,24 @@ +from vacuum_map_parser_roidmi.map_data_parser import RoidmiMapDataParser + +from .vacuum_base import VacuumConfig +from .vacuum_v2 import XiaomiCloudVacuumV2 + + +class RoidmiCloudVacuum(XiaomiCloudVacuumV2): + + def __init__(self, vacuum_config: VacuumConfig): + super().__init__(vacuum_config) + self._roidmi_map_data_parser = RoidmiMapDataParser( + vacuum_config.palette, + vacuum_config.sizes, + vacuum_config.drawables, + vacuum_config.image_config, + vacuum_config.texts + ) + + def map_archive_extension(self) -> str: + return "gz" + + @property + def map_data_parser(self) -> RoidmiMapDataParser: + return self._roidmi_map_data_parser diff --git a/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_unsupported.py b/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_unsupported.py new file mode 100644 index 0000000..b08fbe7 --- /dev/null +++ b/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_unsupported.py @@ -0,0 +1,29 @@ +from vacuum_map_parser_base.map_data import MapData +from vacuum_map_parser_base.map_data_parser import MapDataParser + +from .vacuum_base import VacuumConfig +from .vacuum_v2 import XiaomiCloudVacuumV2 + + +class UnsupportedCloudVacuum(XiaomiCloudVacuumV2): + + def __init__(self, vacuum_config: VacuumConfig): + super().__init__(vacuum_config) + self._unsupported_map_data_parser = MapDataParser( + vacuum_config.palette, + vacuum_config.sizes, + vacuum_config.drawables, + vacuum_config.image_config, + vacuum_config.texts + ) + + @property + def map_archive_extension(self): + return "unknown" + + @property + def map_data_parser(self) -> MapDataParser: + return self._unsupported_map_data_parser + + def decode_and_parse(self, raw_map: bytes) -> MapData | None: + return self._unsupported_map_data_parser.create_empty(f"Vacuum\n{self.model}\nis not supported") diff --git a/custom_components/xiaomi_cloud_map_extractor/common/vacuum_v2.py b/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_v2.py similarity index 50% rename from custom_components/xiaomi_cloud_map_extractor/common/vacuum_v2.py rename to custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_v2.py index 6375a53..f133391 100644 --- a/custom_components/xiaomi_cloud_map_extractor/common/vacuum_v2.py +++ b/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_v2.py @@ -1,15 +1,15 @@ -from typing import Optional +from abc import ABC -from custom_components.xiaomi_cloud_map_extractor.common.vacuum import XiaomiCloudVacuum -from custom_components.xiaomi_cloud_map_extractor.common.xiaomi_cloud_connector import XiaomiCloudConnector +from .vacuum_base import XiaomiCloudVacuum -class XiaomiCloudVacuumV2(XiaomiCloudVacuum): +class XiaomiCloudVacuumV2(XiaomiCloudVacuum, ABC): - def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str): - super().__init__(connector, country, user_id, device_id, model) + @property + def should_get_map_from_vacuum(self) -> bool: + return False - def get_map_url(self, map_name: str) -> Optional[str]: + def get_map_url(self, map_name: str) -> str | None: url = self._connector.get_api_url(self._country) + '/v2/home/get_interim_file_url' params = { "data": f'{{"obj_name":"{self._user_id}/{self._device_id}/{map_name}"}}' @@ -18,6 +18,3 @@ def get_map_url(self, map_name: str) -> Optional[str]: if api_response is None or "result" not in api_response or "url" not in api_response["result"]: return None return api_response["result"]["url"] - - def should_get_map_from_vacuum(self) -> bool: - return False diff --git a/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_viomi.py b/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_viomi.py new file mode 100644 index 0000000..81813c2 --- /dev/null +++ b/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/vacuum_viomi.py @@ -0,0 +1,25 @@ +from vacuum_map_parser_viomi.map_data_parser import ViomiMapDataParser + +from .vacuum_base import VacuumConfig +from .vacuum_v2 import XiaomiCloudVacuumV2 + + +class ViomiCloudVacuum(XiaomiCloudVacuumV2): + + def __init__(self, vacuum_config: VacuumConfig): + super().__init__(vacuum_config) + self._viomi_map_data_parser = ViomiMapDataParser( + vacuum_config.palette, + vacuum_config.sizes, + vacuum_config.drawables, + vacuum_config.image_config, + vacuum_config.texts + ) + + @property + def map_archive_extension(self) -> str: + return "zlib" + + @property + def map_data_parser(self) -> ViomiMapDataParser: + return self._viomi_map_data_parser diff --git a/custom_components/xiaomi_cloud_map_extractor/common/xiaomi_cloud_connector.py b/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/xiaomi_cloud_connector.py similarity index 89% rename from custom_components/xiaomi_cloud_map_extractor/common/xiaomi_cloud_connector.py rename to custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/xiaomi_cloud_connector.py index 94fb641..869efd9 100644 --- a/custom_components/xiaomi_cloud_map_extractor/common/xiaomi_cloud_connector.py +++ b/custom_components/xiaomi_cloud_map_extractor/vacuum_platforms/xiaomi_cloud_connector.py @@ -6,14 +6,13 @@ import os import random import time -from typing import Any, Dict, Optional, Tuple from Crypto.Cipher import ARC4 import requests -from custom_components.xiaomi_cloud_map_extractor.const import * _LOGGER = logging.getLogger(__name__) +CONF_AVAILABLE_COUNTRIES = ["cn", "de", "us", "ru", "tw", "sg", "in", "i2"] # noinspection PyBroadException @@ -34,6 +33,7 @@ def __init__(self, username: str, password: str): self._location = None self._code = None self._serviceToken = None + self.country = None def login_step_1(self) -> bool: url = "https://account.xiaomi.com/pass/serviceLogin?sid=xiaomiio&_json=true" @@ -121,7 +121,7 @@ def login(self) -> bool: self._session.cookies.set("deviceId", self._device_id, domain="xiaomi.com") return self.login_step_1() and self.login_step_2() and self.login_step_3() - def get_raw_map_data(self, map_url) -> Optional[bytes]: + def get_raw_map_data(self, map_url: str | None) -> bytes | None: if map_url is not None: try: response = self._session.get(map_url, timeout=10) @@ -132,7 +132,7 @@ def get_raw_map_data(self, map_url) -> Optional[bytes]: return None def get_device_details(self, token: str, - country: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]: + country: str) -> tuple[str | None, str | None, str | None, str | None]: countries_to_check = CONF_AVAILABLE_COUNTRIES if country is not None: countries_to_check = [country] @@ -146,17 +146,26 @@ def get_device_details(self, token: str, user_id = found[0]["uid"] device_id = found[0]["did"] model = found[0]["model"] + self.country = country return country, user_id, device_id, model return None, None, None, None - def get_devices(self, country: str) -> Any: + def get_devices(self, country: str) -> any: url = self.get_api_url(country) + "/home/device_list" params = { "data": '{"getVirtualModel":false,"getHuamiDevices":0}' } return self.execute_api_call_encrypted(url, params) - def execute_api_call_encrypted(self, url: str, params: Dict[str, str]) -> Any: + def get_other_info(self, device_id: str, method: str, parameters: dict) -> any: + url = self.get_api_url('sg') + "/v2/home/rpc/" + device_id + params = { + "data": json.dumps({"method": method, "params": parameters}, separators=(",", ":")) + } + return self.execute_api_call_encrypted(url, params) + + + def execute_api_call_encrypted(self, url: str, params: dict[str, str]) -> any: headers = { "Accept-Encoding": "identity", "User-Agent": self._agent, @@ -188,7 +197,9 @@ def execute_api_call_encrypted(self, url: str, params: Dict[str, str]) -> Any: return json.loads(decoded) return None - def get_api_url(self, country: str) -> str: + def get_api_url(self, country: str | None = None) -> str: + if country is None: + country = self.country return "https://" + ("" if country == "cn" else (country + ".")) + "api.io.mi.com/app" def signed_nonce(self, nonce: str) -> str: @@ -210,7 +221,7 @@ def generate_device_id() -> str: return "".join((chr(random.randint(97, 122)) for _ in range(6))) @staticmethod - def generate_signature(url, signed_nonce: str, nonce: str, params: Dict[str, str]) -> str: + def generate_signature(url, signed_nonce: str, nonce: str, params: dict[str, str]) -> str: signature_params = [url.split("com")[1], signed_nonce, nonce] for k, v in params.items(): signature_params.append(f"{k}={v}") @@ -219,7 +230,7 @@ def generate_signature(url, signed_nonce: str, nonce: str, params: Dict[str, str return base64.b64encode(signature.digest()).decode() @staticmethod - def generate_enc_signature(url, method: str, signed_nonce: str, params: Dict[str, str]) -> str: + def generate_enc_signature(url, method: str, signed_nonce: str, params: dict[str, str]) -> str: signature_params = [str(method).upper(), url.split("com")[1].replace("/app/", "/")] for k, v in params.items(): signature_params.append(f"{k}={v}") @@ -228,8 +239,8 @@ def generate_enc_signature(url, method: str, signed_nonce: str, params: Dict[str return base64.b64encode(hashlib.sha1(signature_string.encode('utf-8')).digest()).decode() @staticmethod - def generate_enc_params(url: str, method: str, signed_nonce: str, nonce: str, params: Dict[str, str], - ssecurity: str) -> Dict[str, str]: + def generate_enc_params(url: str, method: str, signed_nonce: str, nonce: str, params: dict[str, str], + ssecurity: str) -> dict[str, str]: params['rc4_hash__'] = XiaomiCloudConnector.generate_enc_signature(url, method, signed_nonce, params) for k, v in params.items(): params[k] = XiaomiCloudConnector.encrypt_rc4(signed_nonce, v) @@ -241,7 +252,7 @@ def generate_enc_params(url: str, method: str, signed_nonce: str, nonce: str, pa return params @staticmethod - def to_json(response_text: str) -> Any: + def to_json(response_text: str) -> any: return json.loads(response_text.replace("&&&START&&&", "")) @staticmethod diff --git a/custom_components/xiaomi_cloud_map_extractor/viomi/__init__.py b/custom_components/xiaomi_cloud_map_extractor/viomi/__init__.py deleted file mode 100644 index 9ac5608..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/viomi/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Functionalities specific for Viomi vacuums.""" diff --git a/custom_components/xiaomi_cloud_map_extractor/viomi/image_handler.py b/custom_components/xiaomi_cloud_map_extractor/viomi/image_handler.py deleted file mode 100644 index 538d3a9..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/viomi/image_handler.py +++ /dev/null @@ -1,94 +0,0 @@ -import logging -from typing import Dict, Optional, Set, Tuple - -from PIL import Image -from PIL.Image import Image as ImageType - -from custom_components.xiaomi_cloud_map_extractor.common.image_handler import ImageHandler -from custom_components.xiaomi_cloud_map_extractor.const import * -from custom_components.xiaomi_cloud_map_extractor.types import Colors, ImageConfig -from custom_components.xiaomi_cloud_map_extractor.viomi.parsing_buffer import ParsingBuffer - -_LOGGER = logging.getLogger(__name__) - - -class ImageHandlerViomi(ImageHandler): - MAP_OUTSIDE = 0x00 - MAP_WALL = 0xff - MAP_SCAN = 0x01 - MAP_NEW_DISCOVERED_AREA = 0x02 - MAP_ROOM_MIN = 10 - MAP_ROOM_MAX = 59 - MAP_SELECTED_ROOM_MIN = 60 - MAP_SELECTED_ROOM_MAX = 109 - - @staticmethod - def parse(buf: ParsingBuffer, width: int, height: int, colors: Colors, image_config: ImageConfig, - draw_cleaned_area: bool) \ - -> Tuple[ImageType, Dict[int, Tuple[int, int, int, int]], Set[int], Optional[ImageType]]: - rooms = {} - cleaned_areas = set() - scale = image_config[CONF_SCALE] - trim_left = int(image_config[CONF_TRIM][CONF_LEFT] * width / 100) - trim_right = int(image_config[CONF_TRIM][CONF_RIGHT] * width / 100) - trim_top = int(image_config[CONF_TRIM][CONF_TOP] * height / 100) - trim_bottom = int(image_config[CONF_TRIM][CONF_BOTTOM] * height / 100) - trimmed_height = height - trim_top - trim_bottom - trimmed_width = width - trim_left - trim_right - if trimmed_width == 0 or trimmed_height == 0: - return ImageHandler.create_empty_map_image(colors), rooms, cleaned_areas, None - image = Image.new('RGBA', (trimmed_width, trimmed_height)) - pixels = image.load() - cleaned_areas_layer = None - cleaned_areas_pixels = None - if draw_cleaned_area: - cleaned_areas_layer = Image.new('RGBA', (trimmed_width, trimmed_height)) - cleaned_areas_pixels = cleaned_areas_layer.load() - buf.skip('trim_bottom', trim_bottom * width) - unknown_pixels = set() - for img_y in range(trimmed_height): - buf.skip('trim_left', trim_left) - for img_x in range(trimmed_width): - pixel_type = buf.get_uint8('pixel') - x = img_x - y = trimmed_height - 1 - img_y - if pixel_type == ImageHandlerViomi.MAP_OUTSIDE: - pixels[x, y] = ImageHandler.__get_color__(COLOR_MAP_OUTSIDE, colors) - elif pixel_type == ImageHandlerViomi.MAP_WALL: - pixels[x, y] = ImageHandler.__get_color__(COLOR_MAP_WALL_V2, colors) - elif pixel_type == ImageHandlerViomi.MAP_SCAN: - pixels[x, y] = ImageHandler.__get_color__(COLOR_SCAN, colors) - elif pixel_type == ImageHandlerViomi.MAP_NEW_DISCOVERED_AREA: - pixels[x, y] = ImageHandler.__get_color__(COLOR_NEW_DISCOVERED_AREA, colors) - elif ImageHandlerViomi.MAP_ROOM_MIN <= pixel_type <= ImageHandlerViomi.MAP_SELECTED_ROOM_MAX: - room_x = img_x + trim_left - room_y = img_y + trim_bottom - if pixel_type < ImageHandlerViomi.MAP_SELECTED_ROOM_MIN: - room_number = pixel_type - else: - room_number = pixel_type - ImageHandlerViomi.MAP_SELECTED_ROOM_MIN + ImageHandlerViomi.MAP_ROOM_MIN - cleaned_areas.add(room_number) - if draw_cleaned_area: - cleaned_areas_pixels[x, y] = ImageHandler.__get_color__(COLOR_CLEANED_AREA, colors) - if room_number not in rooms: - rooms[room_number] = (room_x, room_y, room_x, room_y) - else: - rooms[room_number] = (min(rooms[room_number][0], room_x), - min(rooms[room_number][1], room_y), - max(rooms[room_number][2], room_x), - max(rooms[room_number][3], room_y)) - default = ImageHandler.ROOM_COLORS[room_number % len(ImageHandler.ROOM_COLORS)] - pixels[x, y] = ImageHandler.__get_color__(f"{COLOR_ROOM_PREFIX}{room_number}", colors, default) - else: - pixels[x, y] = ImageHandler.__get_color__(COLOR_UNKNOWN, colors) - unknown_pixels.add(pixel_type) - buf.skip('trim_right', trim_right) - buf.skip('trim_top', trim_top * width) - if image_config["scale"] != 1 and trimmed_width != 0 and trimmed_height != 0: - image = image.resize((int(trimmed_width * scale), int(trimmed_height * scale)), resample=Image.NEAREST) - if draw_cleaned_area: - cleaned_areas_layer = cleaned_areas_layer.resize( - (int(trimmed_width * scale), int(trimmed_height * scale)), resample=Image.NEAREST) - if len(unknown_pixels) > 0: - _LOGGER.warning('unknown pixel_types: %s', unknown_pixels) - return image, rooms, cleaned_areas, cleaned_areas_layer diff --git a/custom_components/xiaomi_cloud_map_extractor/viomi/map_data_parser.py b/custom_components/xiaomi_cloud_map_extractor/viomi/map_data_parser.py deleted file mode 100644 index 2721f87..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/viomi/map_data_parser.py +++ /dev/null @@ -1,269 +0,0 @@ -import logging -import math -from typing import Dict, List, Optional, Set, Tuple - -from custom_components.xiaomi_cloud_map_extractor.common.map_data import Area, ImageData, MapData, Path, Point, Room, \ - Wall, Zone -from custom_components.xiaomi_cloud_map_extractor.common.map_data_parser import MapDataParser -from custom_components.xiaomi_cloud_map_extractor.const import * -from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts -from custom_components.xiaomi_cloud_map_extractor.viomi.image_handler import ImageHandlerViomi -from custom_components.xiaomi_cloud_map_extractor.viomi.parsing_buffer import ParsingBuffer - -_LOGGER = logging.getLogger(__name__) - - -class MapDataParserViomi(MapDataParser): - FEATURE_ROBOT_STATUS = 0x00000001 - FEATURE_IMAGE = 0x00000002 - FEATURE_HISTORY = 0x00000004 - FEATURE_CHARGE_STATION = 0x00000008 - FEATURE_RESTRICTED_AREAS = 0x00000010 - FEATURE_CLEANING_AREAS = 0x00000020 - FEATURE_NAVIGATE = 0x00000040 - FEATURE_REALTIME = 0x00000080 - FEATURE_ROOMS = 0x00001000 - - POSITION_UNKNOWN = 1100 - - @staticmethod - def parse(raw: bytes, colors: Colors, drawables: Drawables, texts: Texts, sizes: Sizes, - image_config: ImageConfig, *args, **kwargs) -> MapData: - map_data = MapData(0, 1) - buf = ParsingBuffer('header', raw, 0, len(raw)) - feature_flags = buf.get_uint32('feature_flags') - map_id = buf.peek_uint32('map_id') - _LOGGER.debug('feature_flags: 0x%x, map_id: %d', feature_flags, map_id) - - if feature_flags & MapDataParserViomi.FEATURE_ROBOT_STATUS != 0: - MapDataParserViomi.parse_section(buf, 'robot_status', map_id) - buf.skip('unknown1', 0x28) - - if feature_flags & MapDataParserViomi.FEATURE_IMAGE != 0: - MapDataParserViomi.parse_section(buf, 'image', map_id) - map_data.image, map_data.rooms, map_data.cleaned_rooms = \ - MapDataParserViomi.parse_image(buf, colors, image_config, DRAWABLE_CLEANED_AREA in drawables) - - if feature_flags & MapDataParserViomi.FEATURE_HISTORY != 0: - MapDataParserViomi.parse_section(buf, 'history', map_id) - map_data.path = MapDataParserViomi.parse_history(buf) - - if feature_flags & MapDataParserViomi.FEATURE_CHARGE_STATION != 0: - MapDataParserViomi.parse_section(buf, 'charge_station', map_id) - map_data.charger = MapDataParserViomi.parse_position(buf, 'pos', with_angle=True) - _LOGGER.debug('pos: %s', map_data.charger) - - if feature_flags & MapDataParserViomi.FEATURE_RESTRICTED_AREAS != 0: - MapDataParserViomi.parse_section(buf, 'restricted_areas', map_id) - map_data.walls, map_data.no_go_areas = MapDataParserViomi.parse_restricted_areas(buf) - - if feature_flags & MapDataParserViomi.FEATURE_CLEANING_AREAS != 0: - MapDataParserViomi.parse_section(buf, 'cleaning_areas', map_id) - map_data.zones = MapDataParserViomi.parse_cleaning_areas(buf) - - if feature_flags & MapDataParserViomi.FEATURE_NAVIGATE != 0: - MapDataParserViomi.parse_section(buf, 'navigate', map_id) - buf.skip('unknown1', 4) - map_data.goto = MapDataParserViomi.parse_position(buf, 'pos') - foo = buf.get_float32('foo') - _LOGGER.debug('pos: %s, foo: %f', map_data.goto, foo) - - if feature_flags & MapDataParserViomi.FEATURE_REALTIME != 0: - MapDataParserViomi.parse_section(buf, 'realtime', map_id) - buf.skip('unknown1', 5) - map_data.vacuum_position = MapDataParserViomi.parse_position(buf, 'pos', with_angle=True) - _LOGGER.debug('pos: %s', map_data.vacuum_position) - - if feature_flags & 0x00000800 != 0: - MapDataParserViomi.parse_section(buf, 'unknown1', map_id) - MapDataParserViomi.parse_unknown_section(buf) - - if feature_flags & MapDataParserViomi.FEATURE_ROOMS != 0: - MapDataParserViomi.parse_section(buf, 'rooms', map_id) - MapDataParserViomi.parse_rooms(buf, map_data.rooms) - - if feature_flags & 0x00002000 != 0: - MapDataParserViomi.parse_section(buf, 'unknown2', map_id) - MapDataParserViomi.parse_unknown_section(buf) - - if feature_flags & 0x00004000 != 0: - MapDataParserViomi.parse_section(buf, 'room_outlines', map_id) - MapDataParserViomi.parse_room_outlines(buf) - - buf.check_empty() - - if map_data.rooms is not None: - _LOGGER.debug('rooms: %s', [str(room) for number, room in map_data.rooms.items()]) - if not map_data.image.is_empty: - MapDataParserViomi.draw_elements(colors, drawables, sizes, map_data, image_config) - if len(map_data.rooms) > 0 and map_data.vacuum_position is not None: - map_data.vacuum_room = MapDataParserViomi.get_current_vacuum_room(buf, map_data.vacuum_position) - if map_data.vacuum_room is not None: - map_data.vacuum_room_name = map_data.rooms[map_data.vacuum_room].name - _LOGGER.debug('current vacuum room: %s', map_data.vacuum_room) - ImageHandlerViomi.rotate(map_data.image) - ImageHandlerViomi.draw_texts(map_data.image, texts) - return map_data - - @staticmethod - def map_to_image(p: Point) -> Point: - return Point(p.x * 20 + 400, p.y * 20 + 400) - - @staticmethod - def image_to_map(x: float) -> float: - return (x - 400) / 20 - - @staticmethod - def get_current_vacuum_room(buf: ParsingBuffer, vacuum_position: Point) -> Optional[int]: - vacuum_position_on_image = MapDataParserViomi.map_to_image(vacuum_position) - pixel_type = buf.get_at_image(int(vacuum_position_on_image.y) * 800 + int(vacuum_position_on_image.x)) - if ImageHandlerViomi.MAP_ROOM_MIN <= pixel_type <= ImageHandlerViomi.MAP_ROOM_MAX: - return pixel_type - elif ImageHandlerViomi.MAP_SELECTED_ROOM_MIN <= pixel_type <= ImageHandlerViomi.MAP_SELECTED_ROOM_MAX: - return pixel_type - ImageHandlerViomi.MAP_SELECTED_ROOM_MIN + ImageHandlerViomi.MAP_ROOM_MIN - return None - - @staticmethod - def parse_image(buf: ParsingBuffer, colors: Colors, image_config: ImageConfig, draw_cleaned_area: bool) \ - -> Tuple[ImageData, Dict[int, Room], Set[int]]: - buf.skip('unknown1', 0x08) - image_top = 0 - image_left = 0 - image_height = buf.get_uint32('image_height') - image_width = buf.get_uint32('image_width') - buf.skip('unknown2', 20) - image_size = image_height * image_width - _LOGGER.debug('width: %d, height: %d', image_width, image_height) - if image_width \ - - image_width * (image_config[CONF_TRIM][CONF_LEFT] + image_config[CONF_TRIM][CONF_RIGHT]) / 100 \ - < MINIMAL_IMAGE_WIDTH: - image_config[CONF_TRIM][CONF_LEFT] = 0 - image_config[CONF_TRIM][CONF_RIGHT] = 0 - if image_height \ - - image_height * (image_config[CONF_TRIM][CONF_TOP] + image_config[CONF_TRIM][CONF_BOTTOM]) / 100 \ - < MINIMAL_IMAGE_HEIGHT: - image_config[CONF_TRIM][CONF_TOP] = 0 - image_config[CONF_TRIM][CONF_BOTTOM] = 0 - buf.mark_as_image_beginning() - image, rooms_raw, cleaned_areas, cleaned_areas_layer = ImageHandlerViomi.parse(buf, image_width, image_height, - colors, image_config, - draw_cleaned_area) - _LOGGER.debug('img: number of rooms: %d, numbers: %s', len(rooms_raw), rooms_raw.keys()) - rooms = {} - for number, room in rooms_raw.items(): - rooms[number] = Room(number, MapDataParserViomi.image_to_map(room[0] + image_left), - MapDataParserViomi.image_to_map(room[1] + image_top), - MapDataParserViomi.image_to_map(room[2] + image_left), - MapDataParserViomi.image_to_map(room[3] + image_top)) - return ImageData(image_size, image_top, image_left, image_height, image_width, image_config, - image, MapDataParserViomi.map_to_image, - additional_layers={DRAWABLE_CLEANED_AREA: cleaned_areas_layer}), rooms, cleaned_areas - - @staticmethod - def parse_history(buf: ParsingBuffer) -> Path: - path_points = [] - buf.skip('unknown1', 4) - history_count = buf.get_uint32('history_count') - for _ in range(history_count): - mode = buf.get_uint8('mode') # 0: taxi, 1: working - path_points.append(MapDataParserViomi.parse_position(buf, 'path')) - return Path(len(path_points), 1, 0, [path_points]) - - @staticmethod - def parse_restricted_areas(buf: ParsingBuffer) -> Tuple[List[Wall], List[Area]]: - walls = [] - areas = [] - buf.skip('unknown1', 4) - area_count = buf.get_uint32('area_count') - for _ in range(area_count): - buf.skip('restricted.unknown1', 12) - p1 = MapDataParserViomi.parse_position(buf, 'p1') - p2 = MapDataParserViomi.parse_position(buf, 'p2') - p3 = MapDataParserViomi.parse_position(buf, 'p3') - p4 = MapDataParserViomi.parse_position(buf, 'p4') - buf.skip('restricted.unknown2', 48) - _LOGGER.debug('restricted: %s %s %s %s', p1, p2, p3, p4) - if p1 == p2 and p3 == p4: - walls.append(Wall(p1.x, p1.y, p3.x, p3.y)) - else: - areas.append(Area(p1.x, p1.y, p2.x, p2.y, p3.x, p3.y, p4.x, p4.y)) - return walls, areas - - @staticmethod - def parse_cleaning_areas(buf: ParsingBuffer) -> List[Zone]: - buf.skip('unknown1', 4) - area_count = buf.get_uint32('area_count') - zones = [] - for _ in range(area_count): - buf.skip('area.unknown1', 12) - p1 = MapDataParserViomi.parse_position(buf, 'p1') - p2 = MapDataParserViomi.parse_position(buf, 'p2') - p3 = MapDataParserViomi.parse_position(buf, 'p3') - p4 = MapDataParserViomi.parse_position(buf, 'p4') - buf.skip('area.unknown2', 48) - if p1 is not None and p3 is not None: - zones.append(Zone(p1.x, p1.y, p3.x, p3.y)) - return zones - - @staticmethod - def parse_rooms(buf: ParsingBuffer, map_data_rooms: Dict[int, Room]): - map_name = buf.get_string_len8('map_name') - map_arg = buf.get_uint32('map_arg') - _LOGGER.debug('map#%d: %s', map_arg, map_name) - while map_arg > 1: - map_name = buf.get_string_len8('map_name') - map_arg = buf.get_uint32('map_arg') - _LOGGER.debug('map#%d: %s', map_arg, map_name) - room_count = buf.get_uint32('room_count') - for _ in range(room_count): - room_id = buf.get_uint8('room.id') - room_name = buf.get_string_len8('room.name') - if map_data_rooms is not None and room_id in map_data_rooms: - map_data_rooms[room_id].name = room_name - buf.skip('room.unknown1', 1) - room_text_pos = MapDataParserViomi.parse_position(buf, 'room.text_pos') - _LOGGER.debug('room#%d: %s %s', room_id, room_name, room_text_pos) - buf.skip('unknown1', 6) - - @staticmethod - def parse_room_outlines(buf: ParsingBuffer): - buf.skip('unknown1', 51) - room_count = buf.get_uint32('room_count') - for _ in range(room_count): - room_id = buf.get_uint32('room.id') - segment_count = buf.get_uint32('room.segment_count') - for _ in range(segment_count): - buf.skip('unknown2', 5) - _LOGGER.debug('room#%d: segment_count: %d', room_id, segment_count) - - @staticmethod - def parse_section(buf: ParsingBuffer, name: str, map_id: int): - buf.set_name(name) - magic = buf.get_uint32('magic') - if magic != map_id: - raise ValueError( - f"error parsing section {name} at offset {buf._offs - 4:#x}: magic check failed. " + - f"Magic: {magic:#x}, Map ID: {map_id:#x}") - - @staticmethod - def parse_position(buf: ParsingBuffer, name: str, with_angle: bool = False) -> Optional[Point]: - x = buf.get_float32(name + '.x') - y = buf.get_float32(name + '.y') - if x == MapDataParserViomi.POSITION_UNKNOWN or y == MapDataParserViomi.POSITION_UNKNOWN: - return None - a = None - if with_angle: - a = buf.get_float32(name + '.a') * 180 / math.pi - return Point(x, y, a) - - @staticmethod - def parse_unknown_section(buf: ParsingBuffer) -> bool: - n = buf._data[buf._offs:].find(buf._data[4:8]) - if n >= 0: - buf._offs += n - buf._length -= n - return True - else: - buf._offs += buf._length - buf._length = 0 - return False diff --git a/custom_components/xiaomi_cloud_map_extractor/viomi/parsing_buffer.py b/custom_components/xiaomi_cloud_map_extractor/viomi/parsing_buffer.py deleted file mode 100644 index 5acc334..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/viomi/parsing_buffer.py +++ /dev/null @@ -1,77 +0,0 @@ -import logging - -from struct import unpack_from - -_LOGGER = logging.getLogger(__name__) - - -class ParsingBuffer: - def __init__(self, name: str, data: bytes, start_offs: int, length: int): - self._name = name - self._data = data - self._offs = start_offs - self._length = length - self._image_beginning = None - - def set_name(self, name: str): - self._name = name - _LOGGER.debug('SECTION %s: offset 0x%x', self._name, self._offs) - - def mark_as_image_beginning(self): - self._image_beginning = self._offs - - def get_at_image(self, offset) -> int: - return self._data[self._image_beginning + offset - 1] - - def skip(self, field: str, n: int): - if self._length < n: - raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") - self._offs += n - self._length -= n - - def get_uint8(self, field: str) -> int: - if self._length < 1: - raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") - self._offs += 1 - self._length -= 1 - return self._data[self._offs - 1] - - def get_uint16(self, field: str) -> int: - if self._length < 2: - raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") - self._offs += 2 - self._length -= 2 - return unpack_from(' int: - if self._length < 4: - raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") - self._offs += 4 - self._length -= 4 - return unpack_from(' float: - if self._length < 4: - raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") - self._offs += 4 - self._length -= 4 - return unpack_from(' str: - n = self.get_uint8(field + '.len') - if self._length < n: - raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") - self._offs += n - self._length -= n - return self._data[self._offs - n:self._offs].decode('UTF-8') - - def peek_uint32(self, field: str) -> int: - if self._length < 4: - raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") - return unpack_from(' MapData: - unzipped = zlib.decompress(raw_map) - return MapDataParserViomi.parse(unzipped, colors, drawables, texts, sizes, image_config) - - def get_map_archive_extension(self) -> str: - return "zlib" diff --git a/custom_components/xiaomi_cloud_map_extractor/xiaomi/__init__.py b/custom_components/xiaomi_cloud_map_extractor/xiaomi/__init__.py deleted file mode 100644 index 145a0fa..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/xiaomi/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Functionalities specific for Xiaomi and Roborock vacuums.""" diff --git a/custom_components/xiaomi_cloud_map_extractor/xiaomi/image_handler.py b/custom_components/xiaomi_cloud_map_extractor/xiaomi/image_handler.py deleted file mode 100644 index 5739aea..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/xiaomi/image_handler.py +++ /dev/null @@ -1,83 +0,0 @@ -import logging -from typing import Set, Tuple - -from PIL import Image -from PIL.Image import Image as ImageType - -from custom_components.xiaomi_cloud_map_extractor.common.image_handler import ImageHandler -from custom_components.xiaomi_cloud_map_extractor.const import * -from custom_components.xiaomi_cloud_map_extractor.types import Colors, ImageConfig - -_LOGGER = logging.getLogger(__name__) - - -class ImageHandlerXiaomi(ImageHandler): - MAP_OUTSIDE = 0x00 - MAP_WALL = 0x01 - MAP_INSIDE = 0xFF - MAP_SCAN = 0x07 - - @staticmethod - def parse(raw_data: bytes, width: int, height: int, carpet_map: Set[int], colors: Colors, - image_config: ImageConfig) -> Tuple[ImageType, dict]: - rooms = {} - scale = image_config[CONF_SCALE] - trim_left = int(image_config[CONF_TRIM][CONF_LEFT] * width / 100) - trim_right = int(image_config[CONF_TRIM][CONF_RIGHT] * width / 100) - trim_top = int(image_config[CONF_TRIM][CONF_TOP] * height / 100) - trim_bottom = int(image_config[CONF_TRIM][CONF_BOTTOM] * height / 100) - trimmed_height = height - trim_top - trim_bottom - trimmed_width = width - trim_left - trim_right - image = Image.new('RGBA', (trimmed_width, trimmed_height)) - if width == 0 or height == 0: - return ImageHandler.create_empty_map_image(colors), {} - pixels = image.load() - for img_y in range(trimmed_height): - for img_x in range(trimmed_width): - idx = img_x + trim_left + width * (img_y + trim_bottom) - pixel_type = raw_data[idx] - x = img_x - y = trimmed_height - img_y - 1 - if idx in carpet_map and (x + y) % 2: - pixels[x, y] = ImageHandler.__get_color__(COLOR_CARPETS, colors) - elif pixel_type == ImageHandlerXiaomi.MAP_OUTSIDE: - pixels[x, y] = ImageHandler.__get_color__(COLOR_MAP_OUTSIDE, colors) - elif pixel_type == ImageHandlerXiaomi.MAP_WALL: - pixels[x, y] = ImageHandler.__get_color__(COLOR_MAP_WALL, colors) - elif pixel_type == ImageHandlerXiaomi.MAP_INSIDE: - pixels[x, y] = ImageHandler.__get_color__(COLOR_MAP_INSIDE, colors) - elif pixel_type == ImageHandlerXiaomi.MAP_SCAN: - pixels[x, y] = ImageHandler.__get_color__(COLOR_SCAN, colors) - else: - obstacle = pixel_type & 0x07 - if obstacle == 0: - pixels[x, y] = ImageHandler.__get_color__(COLOR_GREY_WALL, colors) - elif obstacle == 1: - pixels[x, y] = ImageHandler.__get_color__(COLOR_MAP_WALL_V2, colors) - elif obstacle == 7: - room_number = (pixel_type & 0xFF) >> 3 - room_x = img_x + trim_left - room_y = img_y + trim_bottom - if room_number not in rooms: - rooms[room_number] = (room_x, room_y, room_x, room_y) - else: - rooms[room_number] = (min(rooms[room_number][0], room_x), - min(rooms[room_number][1], room_y), - max(rooms[room_number][2], room_x), - max(rooms[room_number][3], room_y)) - default = ImageHandler.ROOM_COLORS[room_number >> 1] - pixels[x, y] = ImageHandler.__get_color__(f"{COLOR_ROOM_PREFIX}{room_number}", colors, default) - else: - pixels[x, y] = ImageHandler.__get_color__(COLOR_UNKNOWN, colors) - if image_config["scale"] != 1 and width != 0 and height != 0: - image = image.resize((int(trimmed_width * scale), int(trimmed_height * scale)), resample=Image.NEAREST) - return image, rooms - - @staticmethod - def get_room_at_pixel(raw_data: bytes, width: int, x: int, y: int) -> int: - room_number = None - pixel_type = raw_data[x + width * y] - if pixel_type not in [ImageHandlerXiaomi.MAP_INSIDE, ImageHandlerXiaomi.MAP_SCAN]: - if pixel_type & 0x07 == 7: - room_number = (pixel_type & 0xFF) >> 3 - return room_number diff --git a/custom_components/xiaomi_cloud_map_extractor/xiaomi/map_data_parser.py b/custom_components/xiaomi_cloud_map_extractor/xiaomi/map_data_parser.py deleted file mode 100644 index 955c872..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/xiaomi/map_data_parser.py +++ /dev/null @@ -1,329 +0,0 @@ -import logging -from typing import Tuple, List, Set - -from custom_components.xiaomi_cloud_map_extractor.common.map_data import * -from custom_components.xiaomi_cloud_map_extractor.common.map_data_parser import MapDataParser -from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, Sizes, Texts -from custom_components.xiaomi_cloud_map_extractor.xiaomi.image_handler import ImageHandlerXiaomi - -_LOGGER = logging.getLogger(__name__) - - -class MapDataParserXiaomi(MapDataParser): - CHARGER = 1 - IMAGE = 2 - PATH = 3 - GOTO_PATH = 4 - GOTO_PREDICTED_PATH = 5 - CURRENTLY_CLEANED_ZONES = 6 - GOTO_TARGET = 7 - ROBOT_POSITION = 8 - NO_GO_AREAS = 9 - VIRTUAL_WALLS = 10 - BLOCKS = 11 - NO_MOPPING_AREAS = 12 - OBSTACLES = 13 - IGNORED_OBSTACLES = 14 - OBSTACLES_WITH_PHOTO = 15 - IGNORED_OBSTACLES_WITH_PHOTO = 16 - CARPET_MAP = 17 - MOP_PATH = 18 - NO_CARPET_AREAS = 19 - DIGEST = 1024 - SIZE = 1024 - KNOWN_OBSTACLE_TYPES = { - 0: 'cable', - 2: 'shoes', - 3: 'poop', - 5: 'extension cord', - 9: 'weighting scale', - 10: 'clothes' - } - - @staticmethod - def parse(raw: bytes, colors: Colors, drawables: Drawables, texts: Texts, sizes: Sizes, - image_config: ImageConfig, *args, **kwargs) -> MapData: - map_data = MapData(25500, 1000) - map_header_length = MapDataParserXiaomi.get_int16(raw, 0x02) - map_data.major_version = MapDataParserXiaomi.get_int16(raw, 0x08) - map_data.minor_version = MapDataParserXiaomi.get_int16(raw, 0x0A) - map_data.map_index = MapDataParserXiaomi.get_int32(raw, 0x0C) - map_data.map_sequence = MapDataParserXiaomi.get_int32(raw, 0x10) - block_start_position = map_header_length - img_start = None - img_data = None - while block_start_position < len(raw): - block_header_length = MapDataParserXiaomi.get_int16(raw, block_start_position + 0x02) - header = MapDataParserXiaomi.get_bytes(raw, block_start_position, block_header_length) - block_type = MapDataParserXiaomi.get_int16(header, 0x00) - block_data_length = MapDataParserXiaomi.get_int32(header, 0x04) - block_data_start = block_start_position + block_header_length - data = MapDataParserXiaomi.get_bytes(raw, block_data_start, block_data_length) - - if block_type == MapDataParserXiaomi.CHARGER: - map_data.charger = MapDataParserXiaomi.parse_object_position(block_data_length, data) - elif block_type == MapDataParserXiaomi.IMAGE: - img_start = block_start_position - img_data_length = block_data_length - img_header_length = block_header_length - img_data = data - img_header = header - elif block_type == MapDataParserXiaomi.ROBOT_POSITION: - map_data.vacuum_position = MapDataParserXiaomi.parse_object_position(block_data_length, data) - elif block_type == MapDataParserXiaomi.PATH: - map_data.path = MapDataParserXiaomi.parse_path(block_start_position, header, raw) - elif block_type == MapDataParserXiaomi.GOTO_PATH: - map_data.goto_path = MapDataParserXiaomi.parse_path(block_start_position, header, raw) - elif block_type == MapDataParserXiaomi.GOTO_PREDICTED_PATH: - map_data.predicted_path = MapDataParserXiaomi.parse_path(block_start_position, header, raw) - elif block_type == MapDataParserXiaomi.CURRENTLY_CLEANED_ZONES: - map_data.zones = MapDataParserXiaomi.parse_zones(data, header) - elif block_type == MapDataParserXiaomi.GOTO_TARGET: - map_data.goto = MapDataParserXiaomi.parse_goto_target(data) - elif block_type == MapDataParserXiaomi.DIGEST: - map_data.is_valid = True - elif block_type == MapDataParserXiaomi.VIRTUAL_WALLS: - map_data.walls = MapDataParserXiaomi.parse_walls(data, header) - elif block_type == MapDataParserXiaomi.NO_GO_AREAS: - map_data.no_go_areas = MapDataParserXiaomi.parse_area(header, data) - elif block_type == MapDataParserXiaomi.NO_MOPPING_AREAS: - map_data.no_mopping_areas = MapDataParserXiaomi.parse_area(header, data) - elif block_type == MapDataParserXiaomi.OBSTACLES: - map_data.obstacles = MapDataParserXiaomi.parse_obstacles(data, header) - elif block_type == MapDataParserXiaomi.IGNORED_OBSTACLES: - map_data.ignored_obstacles = MapDataParserXiaomi.parse_obstacles(data, header) - elif block_type == MapDataParserXiaomi.OBSTACLES_WITH_PHOTO: - map_data.obstacles_with_photo = MapDataParserXiaomi.parse_obstacles(data, header) - elif block_type == MapDataParserXiaomi.IGNORED_OBSTACLES_WITH_PHOTO: - map_data.ignored_obstacles_with_photo = MapDataParserXiaomi.parse_obstacles(data, header) - elif block_type == MapDataParserXiaomi.BLOCKS: - block_pairs = MapDataParserXiaomi.get_int16(header, 0x08) - map_data.blocks = MapDataParserXiaomi.get_bytes(data, 0, block_pairs) - elif block_type == MapDataParserXiaomi.MOP_PATH: - points_mask = MapDataParserXiaomi.get_bytes(raw, block_data_start, block_data_length) - # only the map_data.path points where points_mask == 1 are in mop_path - map_data.mop_path = MapDataParserXiaomi.parse_mop_path(map_data.path, points_mask) - elif block_type == MapDataParserXiaomi.CARPET_MAP: - data = MapDataParserXiaomi.get_bytes(raw, block_data_start, block_data_length) - # only the indexes where value == 1 are in carpet_map - map_data.carpet_map = MapDataParserXiaomi.parse_carpet_map(data, image_config) - elif block_type == MapDataParserXiaomi.NO_CARPET_AREAS: - map_data.no_carpet_areas = MapDataParserXiaomi.parse_area(header, data) - else: - _LOGGER.debug("UNKNOWN BLOCK TYPE: %s, header length %s, data length %s", block_type, block_header_length, block_data_length) - block_start_position = block_start_position + block_data_length + MapDataParserXiaomi.get_int8(header, 2) - - if img_data: - image, rooms = MapDataParserXiaomi.parse_image(img_data_length, img_header_length, img_data, img_header, map_data.carpet_map, - colors, image_config) - map_data.image = image - map_data.rooms = rooms - - if not map_data.image.is_empty: - MapDataParserXiaomi.draw_elements(colors, drawables, sizes, map_data, image_config) - if len(map_data.rooms) > 0 and map_data.vacuum_position is not None: - map_data.vacuum_room = MapDataParserXiaomi.get_current_vacuum_room(img_start, raw, - map_data.vacuum_position) - ImageHandlerXiaomi.rotate(map_data.image) - ImageHandlerXiaomi.draw_texts(map_data.image, texts) - return map_data - - @staticmethod - def map_to_image(p: Point) -> Point: - return Point(p.x / MM, p.y / MM) - - @staticmethod - def image_to_map(x: float) -> float: - return x * MM - - @staticmethod - def get_current_vacuum_room(block_start_position: int, raw: bytes, vacuum_position: Point) -> int: - block_header_length = MapDataParserXiaomi.get_int16(raw, block_start_position + 0x02) - header = MapDataParserXiaomi.get_bytes(raw, block_start_position, block_header_length) - block_data_length = MapDataParserXiaomi.get_int32(header, 0x04) - block_data_start = block_start_position + block_header_length - data = MapDataParserXiaomi.get_bytes(raw, block_data_start, block_data_length) - image_top = MapDataParserXiaomi.get_int32(header, block_header_length - 16) - image_left = MapDataParserXiaomi.get_int32(header, block_header_length - 12) - image_width = MapDataParserXiaomi.get_int32(header, block_header_length - 4) - p = MapDataParserXiaomi.map_to_image(vacuum_position) - room = ImageHandlerXiaomi.get_room_at_pixel(data, image_width, round(p.x - image_left), round(p.y - image_top)) - return room - - @staticmethod - def parse_image(block_data_length: int, block_header_length: int, data: bytes, header: bytes, carpet_map: Set[int], - colors: Colors, image_config: ImageConfig) -> Tuple[ImageData, Dict[int, Room]]: - image_size = block_data_length - image_top = MapDataParserXiaomi.get_int32(header, block_header_length - 16) - image_left = MapDataParserXiaomi.get_int32(header, block_header_length - 12) - image_height = MapDataParserXiaomi.get_int32(header, block_header_length - 8) - image_width = MapDataParserXiaomi.get_int32(header, block_header_length - 4) - if image_width \ - - image_width * (image_config[CONF_TRIM][CONF_LEFT] + image_config[CONF_TRIM][CONF_RIGHT]) / 100 \ - < MINIMAL_IMAGE_WIDTH: - image_config[CONF_TRIM][CONF_LEFT] = 0 - image_config[CONF_TRIM][CONF_RIGHT] = 0 - if image_height \ - - image_height * (image_config[CONF_TRIM][CONF_TOP] + image_config[CONF_TRIM][CONF_BOTTOM]) / 100 \ - < MINIMAL_IMAGE_HEIGHT: - image_config[CONF_TRIM][CONF_TOP] = 0 - image_config[CONF_TRIM][CONF_BOTTOM] = 0 - image, rooms_raw = ImageHandlerXiaomi.parse(data, image_width, image_height, carpet_map, colors, image_config) - rooms = {} - for number, room in rooms_raw.items(): - rooms[number] = Room(number, MapDataParserXiaomi.image_to_map(room[0] + image_left), - MapDataParserXiaomi.image_to_map(room[1] + image_top), - MapDataParserXiaomi.image_to_map(room[2] + image_left), - MapDataParserXiaomi.image_to_map(room[3] + image_top)) - return ImageData(image_size, - image_top, - image_left, - image_height, - image_width, - image_config, - image, MapDataParserXiaomi.map_to_image), rooms - - @staticmethod - def parse_carpet_map(data: bytes, image_config: ImageConfig) -> Set[int]: - carpet_map = set() - - for i, v in enumerate(data): - if v: - carpet_map.add(i) - return carpet_map - - @staticmethod - def parse_goto_target(data: bytes) -> Point: - x = MapDataParserXiaomi.get_int16(data, 0x00) - y = MapDataParserXiaomi.get_int16(data, 0x02) - return Point(x, y) - - @staticmethod - def parse_object_position(block_data_length: int, data: bytes) -> Point: - x = MapDataParserXiaomi.get_int32(data, 0x00) - y = MapDataParserXiaomi.get_int32(data, 0x04) - a = None - if block_data_length > 8: - a = MapDataParserXiaomi.get_int32(data, 0x08) - if a > 0xFF: - a = (a & 0xFF) - 256 - return Point(x, y, a) - - @staticmethod - def parse_walls(data: bytes, header: bytes) -> List[Wall]: - wall_pairs = MapDataParserXiaomi.get_int16(header, 0x08) - walls = [] - for wall_start in range(0, wall_pairs * 8, 8): - x0 = MapDataParserXiaomi.get_int16(data, wall_start + 0) - y0 = MapDataParserXiaomi.get_int16(data, wall_start + 2) - x1 = MapDataParserXiaomi.get_int16(data, wall_start + 4) - y1 = MapDataParserXiaomi.get_int16(data, wall_start + 6) - walls.append(Wall(x0, y0, x1, y1)) - return walls - - @staticmethod - def parse_obstacles(data: bytes, header: bytes) -> List[Obstacle]: - obstacle_pairs = MapDataParserXiaomi.get_int16(header, 0x08) - obstacles = [] - if obstacle_pairs == 0: - return obstacles - obstacle_size = int(len(data) / obstacle_pairs) - for obstacle_start in range(0, obstacle_pairs * obstacle_size, obstacle_size): - x = MapDataParserXiaomi.get_int16(data, obstacle_start + 0) - y = MapDataParserXiaomi.get_int16(data, obstacle_start + 2) - details = {} - if obstacle_size >= 6: - details[ATTR_TYPE] = MapDataParserXiaomi.get_int16(data, obstacle_start + 4) - if details[ATTR_TYPE] in MapDataParserXiaomi.KNOWN_OBSTACLE_TYPES: - details[ATTR_DESCRIPTION] = MapDataParserXiaomi.KNOWN_OBSTACLE_TYPES[details[ATTR_TYPE]] - if obstacle_size >= 10: - u1 = MapDataParserXiaomi.get_int16(data, obstacle_start + 6) - u2 = MapDataParserXiaomi.get_int16(data, obstacle_start + 8) - details[ATTR_CONFIDENCE_LEVEL] = 0 if u2 == 0 else u1 * 10.0 / u2 - if obstacle_size == 28 and (data[obstacle_start + 12] & 0xFF) > 0: - txt = MapDataParserXiaomi.get_bytes(data, obstacle_start + 12, 16) - details[ATTR_PHOTO_NAME] = txt.decode('ascii') - obstacles.append(Obstacle(x, y, details)) - return obstacles - - @staticmethod - def parse_zones(data: bytes, header: bytes) -> List[Zone]: - zone_pairs = MapDataParserXiaomi.get_int16(header, 0x08) - zones = [] - for zone_start in range(0, zone_pairs * 8, 8): - x0 = MapDataParserXiaomi.get_int16(data, zone_start + 0) - y0 = MapDataParserXiaomi.get_int16(data, zone_start + 2) - x1 = MapDataParserXiaomi.get_int16(data, zone_start + 4) - y1 = MapDataParserXiaomi.get_int16(data, zone_start + 6) - zones.append(Zone(x0, y0, x1, y1)) - return zones - - @staticmethod - def parse_path(block_start_position: int, header: bytes, raw: bytes) -> Path: - path_points = [] - end_pos = MapDataParserXiaomi.get_int32(header, 0x04) - point_length = MapDataParserXiaomi.get_int32(header, 0x08) - point_size = MapDataParserXiaomi.get_int32(header, 0x0C) - angle = MapDataParserXiaomi.get_int32(header, 0x10) - start_pos = block_start_position + 0x14 - for pos in range(start_pos, start_pos + end_pos, 4): - x = MapDataParserXiaomi.get_int16(raw, pos) - y = MapDataParserXiaomi.get_int16(raw, pos + 2) - path_points.append(Point(x, y)) - return Path(point_length, point_size, angle, [path_points]) - - @staticmethod - def parse_mop_path(path: Path, mask: bytes) -> Path: - mop_paths = [] - points_num = 0 - for each_path in path.path: - mop_path_points = [] - for i, point in enumerate(each_path): - if mask[i]: - mop_path_points.append(point) - if (i + 1) < len(mask) and not mask[i + 1]: - points_num += len(mop_path_points) - mop_paths.append(mop_path_points) - mop_path_points = [] - - points_num += len(mop_path_points) - mop_paths.append(mop_path_points) - return Path(points_num, path.point_size, path.angle, mop_paths) - - @staticmethod - def parse_area(header: bytes, data: bytes) -> List[Area]: - area_pairs = MapDataParserXiaomi.get_int16(header, 0x08) - areas = [] - for area_start in range(0, area_pairs * 16, 16): - x0 = MapDataParserXiaomi.get_int16(data, area_start + 0) - y0 = MapDataParserXiaomi.get_int16(data, area_start + 2) - x1 = MapDataParserXiaomi.get_int16(data, area_start + 4) - y1 = MapDataParserXiaomi.get_int16(data, area_start + 6) - x2 = MapDataParserXiaomi.get_int16(data, area_start + 8) - y2 = MapDataParserXiaomi.get_int16(data, area_start + 10) - x3 = MapDataParserXiaomi.get_int16(data, area_start + 12) - y3 = MapDataParserXiaomi.get_int16(data, area_start + 14) - areas.append(Area(x0, y0, x1, y1, x2, y2, x3, y3)) - return areas - - @staticmethod - def get_bytes(data: bytes, start_index: int, size: int) -> bytes: - return data[start_index: start_index + size] - - @staticmethod - def get_int8(data: bytes, address: int) -> int: - return data[address] & 0xFF - - @staticmethod - def get_int16(data: bytes, address: int) -> int: - return \ - ((data[address + 0] << 0) & 0xFF) | \ - ((data[address + 1] << 8) & 0xFFFF) - - @staticmethod - def get_int32(data: bytes, address: int) -> int: - return \ - ((data[address + 0] << 0) & 0xFF) | \ - ((data[address + 1] << 8) & 0xFFFF) | \ - ((data[address + 2] << 16) & 0xFFFFFF) | \ - ((data[address + 3] << 24) & 0xFFFFFFFF) diff --git a/custom_components/xiaomi_cloud_map_extractor/xiaomi/vacuum.py b/custom_components/xiaomi_cloud_map_extractor/xiaomi/vacuum.py deleted file mode 100644 index d3913d3..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/xiaomi/vacuum.py +++ /dev/null @@ -1,42 +0,0 @@ -import gzip -from typing import Optional - -from custom_components.xiaomi_cloud_map_extractor.common.map_data import MapData -from custom_components.xiaomi_cloud_map_extractor.common.vacuum import XiaomiCloudVacuum -from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts -from custom_components.xiaomi_cloud_map_extractor.xiaomi.map_data_parser import MapDataParserXiaomi - - -class XiaomiVacuum(XiaomiCloudVacuum): - - def __init__(self, connector, country, user_id, device_id, model): - super().__init__(connector, country, user_id, device_id, model) - - def get_map_url(self, map_name: str) -> Optional[str]: - url = self._connector.get_api_url(self._country) + "/home/getmapfileurl" - params = { - "data": '{"obj_name":"' + map_name + '"}' - } - api_response = self._connector.execute_api_call_encrypted(url, params) - if api_response is None or \ - "result" not in api_response or \ - api_response["result"] is None or \ - "url" not in api_response["result"]: - return None - return api_response["result"]["url"] - - def decode_map(self, - raw_map: bytes, - colors: Colors, - drawables: Drawables, - texts: Texts, - sizes: Sizes, - image_config: ImageConfig) -> MapData: - unzipped = gzip.decompress(raw_map) - return MapDataParserXiaomi.parse(unzipped, colors, drawables, texts, sizes, image_config) - - def should_get_map_from_vacuum(self) -> bool: - return True - - def get_map_archive_extension(self) -> str: - return "gz" diff --git a/scripts/map_processor.py b/scripts/map_processor.py index 3608fa1..aabd93d 100644 --- a/scripts/map_processor.py +++ b/scripts/map_processor.py @@ -6,12 +6,21 @@ from homeassistant import config_entries # to fix circular imports from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_TOKEN, CONF_USERNAME +from vacuum_map_parser_base.config.color import ColorsPalette, SupportedColor +from vacuum_map_parser_base.config.drawable import Drawable +from vacuum_map_parser_base.config.image_config import ImageConfig, TrimConfig +from vacuum_map_parser_base.config.size import Size, Sizes +from vacuum_map_parser_base.config.text import Text + from custom_components.xiaomi_cloud_map_extractor.camera import PLATFORM_SCHEMA, VacuumCamera from custom_components.xiaomi_cloud_map_extractor.const import * -from custom_components.xiaomi_cloud_map_extractor.dreame.vacuum import DreameVacuum -from custom_components.xiaomi_cloud_map_extractor.roidmi.vacuum import RoidmiVacuum -from custom_components.xiaomi_cloud_map_extractor.viomi.vacuum import ViomiVacuum -from custom_components.xiaomi_cloud_map_extractor.xiaomi.vacuum import XiaomiVacuum +from custom_components.xiaomi_cloud_map_extractor.vacuum_platforms.vacuum_base import VacuumConfig +from custom_components.xiaomi_cloud_map_extractor.vacuum_platforms.vacuum_dreame import DreameCloudVacuum +from custom_components.xiaomi_cloud_map_extractor.vacuum_platforms.vacuum_roidmi import RoidmiCloudVacuum +from custom_components.xiaomi_cloud_map_extractor.vacuum_platforms.vacuum_unsupported import UnsupportedCloudVacuum +from custom_components.xiaomi_cloud_map_extractor.vacuum_platforms.vacuum_viomi import ViomiCloudVacuum +from custom_components.xiaomi_cloud_map_extractor.vacuum_platforms.vacuum_roborock import RoborockCloudVacuum +from custom_components.xiaomi_cloud_map_extractor.vacuum_platforms.xiaomi_cloud_connector import XiaomiCloudConnector logging.basicConfig() logging.getLogger().setLevel(logging.WARNING) @@ -75,34 +84,55 @@ def parse_map_file(map_config, map_filename, api, suffix=""): texts = map_config[CONF_TEXTS] sizes = map_config[CONF_SIZES] transform = map_config[CONF_MAP_TRANSFORM] - for room, color in room_colors.items(): - colors[f"{COLOR_ROOM_PREFIX}{room}"] = color drawables = map_config[CONF_DRAW] if DRAWABLE_ALL in drawables: drawables = CONF_AVAILABLE_DRAWABLES[1:] + image_config = ImageConfig(**{**transform, "trim": TrimConfig(**transform["trim"])}) + + vacuum_config = VacuumConfig( + XiaomiCloudConnector("", ""), + "", + "", + "", + "", + "", + "", + palette=ColorsPalette(colors_dict=colors, room_colors={str(k): v for k, v in room_colors.items()}), + drawables=drawables, + texts=[Text(**t) for t in texts], + sizes=Sizes(sizes), + image_config=image_config, + store_map_path=None + ) + map_data = None try: if api == CONF_AVAILABLE_API_XIAOMI: - map_data = XiaomiVacuum.decode_map(None, map_file, colors, drawables, texts, sizes, transform) + vacuum = RoborockCloudVacuum(vacuum_config) elif api == CONF_AVAILABLE_API_VIOMI: - map_data = ViomiVacuum.decode_map(None, map_file, colors, drawables, texts, sizes, transform) + vacuum = ViomiCloudVacuum(vacuum_config) elif api == CONF_AVAILABLE_API_ROIDMI: - map_data = RoidmiVacuum.decode_map(None, map_file, colors, drawables, texts, sizes, transform) + vacuum = RoidmiCloudVacuum(vacuum_config) elif api == CONF_AVAILABLE_API_DREAME: - map_data = DreameVacuum.decode_map(None, map_file, colors, drawables, texts, sizes, transform) + vacuum = DreameCloudVacuum(vacuum_config) + else: + vacuum = UnsupportedCloudVacuum(vacuum_config) + map_data = vacuum.decode_and_parse(map_file) + except AttributeError as ae: + print(f" Failed to parse map data! {ae}") except Exception as e: - print(f"Failed to parse map data! {e}") - if map_data is not None: + print(f" Failed to parse map data! {e}") + if map_data is not None and map_data.image is not None: map_data.image.data.save(f"{map_filename}{suffix}.png") - print(f"Map image saved to \"{map_filename}{suffix}.png\"") + print(f" Map image saved to \"{map_filename}{suffix}.png\"") attributes_output_file = open(f"{map_filename}{suffix}.yaml", "w") attributes = VacuumCamera.extract_attributes(map_data, CONF_AVAILABLE_ATTRIBUTES, "") yaml.dump(attributes_to_dict(attributes), attributes_output_file) attributes_output_file.close() - print(f"Map attributes saved to \"{map_filename}{suffix}.yaml\"") + print(f" Map attributes saved to \"{map_filename}{suffix}.yaml\"") else: - print("Failed to parse map data!") + print(" Failed to parse map data!") def run_download(map_config, data_output_dir): @@ -123,7 +153,6 @@ def run_test(map_config, test_dir): print(api) for file in filter(lambda ff: os.path.isfile(ff), map(lambda f: f"{test_dir}/{api}/{f}", os.listdir(f"{test_dir}/{api}"))): - print(" " + file) output = file + "_output" if not os.path.exists(output): os.mkdir(output)