diff --git a/custom_components/open_epaper_link/__init__.py b/custom_components/open_epaper_link/__init__.py index 2111126..f9dd237 100644 --- a/custom_components/open_epaper_link/__init__.py +++ b/custom_components/open_epaper_link/__init__.py @@ -9,6 +9,7 @@ import pprint import time +from .tag_types import get_tag_types_manager from .util import send_tag_cmd, reboot_ap _LOGGER = logging.getLogger(__name__) @@ -138,6 +139,24 @@ async def scan_channels_service(service: ServiceCall)-> None: async def reboot_ap_service(service: ServiceCall)-> None: await reboot_ap(hass) + async def refresh_tag_types_service(service: ServiceCall) -> None: + """Service to force refresh of tag types.""" + manager = await get_tag_types_manager(hass) + # Force a refresh by clearing the last update timestamp + manager._last_update = None + await manager.ensure_types_loaded() + tag_types_len = len(manager.get_all_types()) + message = f"Successfully refreshed {tag_types_len} tag types from GitHub" + await hass.services.async_call( + "persistent_notification", + "create", + { + "title": "Tag Types Refreshed", + "message:": message, + "notification_id": "tag_types_refresh_notification", + }, + ) + # register the services hass.services.register(DOMAIN, "dlimg", dlimg) hass.services.register(DOMAIN, "lines5", lines5service) @@ -149,6 +168,7 @@ async def reboot_ap_service(service: ServiceCall)-> None: hass.services.register(DOMAIN, "reboot_tag", reboot_tag_service) hass.services.register(DOMAIN, "scan_channels", scan_channels_service) hass.services.register(DOMAIN, "reboot_ap", reboot_ap_service) + hass.services.register(DOMAIN, "refresh_tag_types", refresh_tag_types_service) # error handling needs to be improved return True diff --git a/custom_components/open_epaper_link/button.py b/custom_components/open_epaper_link/button.py index 199390b..c5965e0 100644 --- a/custom_components/open_epaper_link/button.py +++ b/custom_components/open_epaper_link/button.py @@ -7,7 +7,7 @@ import json import logging -from .hw_map import get_hw_dimensions +from .tag_types import get_hw_dimensions, get_tag_types_manager from .util import send_tag_cmd, reboot_ap from .const import DOMAIN @@ -23,6 +23,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_e buttons.append(ScanChannelsButton(hass, tag_mac, hub)) buttons.append(IdentifyTagButton(hass,tag_mac,hub)) buttons.append(RebootAPButton(hass, hub)) + buttons.append(RefreshTagTypesButton(hass)) async_add_entities(buttons) class ClearPendingTagButton(ButtonEntity): @@ -195,4 +196,42 @@ async def async_press(self) -> None: if response.status_code == 200: _LOGGER.info(f"Sent identify command to tag {self._tag_mac}") except requests.RequestException as err: - _LOGGER.error(f"Failed to send identify command to tag {self._tag_mac}: {err}") \ No newline at end of file + _LOGGER.error(f"Failed to send identify command to tag {self._tag_mac}: {err}") + +class RefreshTagTypesButton(ButtonEntity): + """Button to manually refresh tag types from GitHub.""" + + def __init__(self, hass: HomeAssistant) -> None: + self._hass = hass + self._attr_unique_id = "refresh_tag_types" + self._attr_name = "Refresh Tag Types" + self._attr_entity_category = EntityCategory.DIAGNOSTIC + self._attr_icon = "mdi:refresh" + + @property + def device_info(self): + """Return device info.""" + return { + "identifiers": {(DOMAIN, "ap")}, + "name": "OpenEPaperLink AP", + "model": "esp32", + "manufacturer": "OpenEPaperLink", + } + + async def async_press(self) -> None: + """Trigger a manual refresh of tag types.""" + manager = await get_tag_types_manager(self._hass) + # Force a refresh by clearing the last update timestamp + manager._last_update = None + await manager.ensure_types_loaded() + tag_types_len = len(manager.get_all_types()) + message = f"Successfully refreshed {tag_types_len} tag types from GitHub" + await self.hass.services.async_call( + "persistent_notification", + "create", + { + "title": "Tag Types Refreshed", + "message": message, + "notification_id": "tag_types_refresh_notification", + }, + ) \ No newline at end of file diff --git a/custom_components/open_epaper_link/hub.py b/custom_components/open_epaper_link/hub.py index eb4437c..3412d84 100644 --- a/custom_components/open_epaper_link/hub.py +++ b/custom_components/open_epaper_link/hub.py @@ -11,14 +11,14 @@ import logging import os from threading import Thread -from typing import Any +from typing import Any, Final from homeassistant.core import HomeAssistant from homeassistant.config_entries import ConfigEntry from homeassistant.helpers import device_registry as dr from .button import ClearPendingTagButton from .const import DOMAIN -from .hw_map import is_in_hw_map, get_hw_string, get_hw_dimensions +from .tag_types import get_hw_dimensions, get_hw_string, is_in_hw_map, get_tag_types_manager _LOGGER: Final = logging.getLogger(__name__) @@ -48,138 +48,201 @@ def __init__(self, hass: HomeAssistant, host: str, cfgentry: str) -> None: self.eventloop = asyncio.get_event_loop() self._is_fetching_config = False self.eventloop.create_task(self.fetch_ap_config()) + self._tag_manager = None + self.tag_manager_initialized = asyncio.Event() + self._attempted_tag_type_refreshes = set() + self._pending_states = {} thread = Thread(target=self.connection_thread) + self._hass.async_create_task(self._init_tag_manager()) thread.start() self.online = True - #parses websocket messages - def on_message(self,ws, message) -> None: - data = json.loads('{' + message.split("{", 1)[-1]) + + async def _init_tag_manager(self) -> None: + """Initialize tag manager.""" + self._tag_manager = await get_tag_types_manager(self._hass) + self.tag_manager_initialized.set() # Signal that initialization is complete + + async def _try_refresh_hw_type(self, hw_type: int) -> bool: + """Try to refresh tag types from GitHub for a missing hardware type.""" + if hw_type in self._attempted_tag_type_refreshes: + return False + + self._attempted_tag_type_refreshes.add(hw_type) + _LOGGER.info(f"Attempting to refresh tag types for unknown hardware type {hw_type}") + + # Force a refresh of tag types + manager = self._tag_manager + manager._last_update = None + await manager.ensure_types_loaded() + + # Check if the hardware type is now available + success = manager.is_in_hw_map(hw_type) + if success: + _LOGGER.info(f"Successfully retrieved definition for hardware type {hw_type}") + + # Apply any pending states for this hardware type + if hw_type in self._pending_states: + for state_info in self._pending_states[hw_type]: + await self._apply_tag_state(state_info) + del self._pending_states[hw_type] + else: + _LOGGER.warning( + f"Hardware type {hw_type} not found in definitions, even after refresh attempt. " + f"Please report this to the OpenEPaperLink project if this is a valid tag type." + ) + return success + + async def _set_tag_state(self, tag_mac: str, hw_type: int, tag_name: str) -> None: + """Set up the entity state for a tag.""" + width, height = self._tag_manager.get_hw_dimensions(hw_type) + self._hass.states.set(DOMAIN + "." + tag_mac, hw_type, { + "icon": "mdi:fullscreen", + "friendly_name": tag_name, + "attr_unique_id": tag_mac, + "unique_id": tag_mac, + "device_class": "sensor", + "device_info": { + "identifiers": {(DOMAIN, tag_mac)} + }, + "should_poll": False, + "hwtype": hw_type, + "hwstring": self._tag_manager.get_hw_string(hw_type), + "width": width, + "height": height, + }) + + async def _apply_tag_state(self, state_info: dict) -> None: + """Apply a pending tag state.""" + await self._set_tag_state( + state_info["tagmac"], + state_info["hwType"], + state_info["tagname"] + ) + + def _store_tag_data(self, tag_mac: str, tag_data: dict, tag_name: str) -> None: + """Store tag data in the hub's data dictionary.""" + self.data[tag_mac] = { + "temperature": tag_data.get('temperature'), + "rssi": tag_data.get('RSSI'), + "battery": tag_data.get('batteryMv'), + "lqi": tag_data.get('LQI'), + "hwtype": tag_data.get('hwType'), + "hwstring": self._tag_manager.get_hw_string(tag_data.get('hwType')), + "contentmode": tag_data.get('contentMode'), + "lastseen": tag_data.get('lastseen'), + "nextupdate": tag_data.get('nextupdate'), + "nextcheckin": tag_data.get('nextcheckin'), + "pending": tag_data.get('pending'), + "wakeupReason": tag_data.get('wakeupReason'), + "capabilities": tag_data.get('capabilities'), + "external": tag_data.get('isexternal'), + "alias": tag_data.get('alias'), + "hashv": tag_data.get('hash'), + "modecfgjson": tag_data.get('modecfgjson'), + "rotate": tag_data.get('rotate'), + "lut": tag_data.get('lut'), + "ch": tag_data.get('ch'), + "ver": tag_data.get('ver'), + "tagname": tag_name + } + + def on_message(self, ws, message) -> None: + # Wait for the tag manager to initialize if needed + if not self.tag_manager_initialized.is_set(): + future = asyncio.run_coroutine_threadsafe(self.tag_manager_initialized.wait(), self.eventloop) + future.result() + + data = json.loads('{' + message.split("{", 1)[-1]) if 'sys' in data: + # Handle system data sys = data.get('sys') - systime = sys.get('currtime') - heap = sys.get('heap') - recordcount = sys.get('recordcount') - dbsize = sys.get('dbsize') - littlefsfree = sys.get('littlefsfree') - apstate = sys.get('apstate') - runstate = sys.get('runstate') - temp = sys.get('temp') - rssi = sys.get('rssi') - wifistatus = sys.get('wifistatus') - wifissid = sys.get('wifissid') - self._hass.states.set(DOMAIN + ".ip", self._host,{"icon": "mdi:ip","friendly_name": "AP IP","should_poll": False}) - self.data["ap"] = dict() - self.data["ap"]["ip"] = self._host - self.data["ap"]["systime"] = systime - self.data["ap"]["heap"] = heap - self.data["ap"]["recordcount"] = recordcount - self.data["ap"]["dbsize"] = dbsize - self.data["ap"]["littlefsfree"] = littlefsfree - self.data["ap"]["rssi"] = rssi - self.data["ap"]["apstate"] = apstate - self.data["ap"]["runstate"] = runstate - self.data["ap"]["temp"] = temp - self.data["ap"]["wifistatus"] = wifistatus - self.data["ap"]["wifissid"] = wifissid + self._handle_sys_data(sys) elif 'tags' in data: tag = data.get('tags')[0] tagmac = tag.get('mac') - lastseen = tag.get('lastseen') - nextupdate = tag.get('nextupdate') - nextcheckin = tag.get('nextcheckin') - LQI = tag.get('LQI') - RSSI = tag.get('RSSI') - temperature = tag.get('temperature') - batteryMv = tag.get('batteryMv') - pending = tag.get('pending') - hwType = tag.get('hwType') - contentMode = tag.get('contentMode') - alias = tag.get('alias') - wakeupReason = tag.get('wakeupReason') - capabilities = tag.get('capabilities') - hashv = tag.get('hash') - modecfgjson = tag.get('modecfgjson') - isexternal = tag.get('isexternal') - rotate = tag.get('rotate') - lut = tag.get('lut') - ch = tag.get('ch') - ver = tag.get('ver') - tagname = "" - if alias: - tagname = alias - else: - tagname = tagmac - #required for automations - if is_in_hw_map(hwType): - width, height = get_hw_dimensions(hwType) - self._hass.states.set(DOMAIN + "." + tagmac, hwType,{ - "icon": "mdi:fullscreen", - "friendly_name": tagname, - "attr_unique_id": tagmac, - "unique_id": tagmac, - "device_class": "sensor", - "device_info": { - "identifiers": {(DOMAIN, tagmac)} - }, - "should_poll": False, - "hwtype": hwType, - "hwstring": get_hw_string(hwType), - "width": width, - "height": height, - }) - else: - _LOGGER.warning(f"ID {hwType} not in hwmap, please open an issue on github about this.") - - self.data[tagmac] = dict() - self.data[tagmac]["temperature"] = temperature - self.data[tagmac]["rssi"] = RSSI - self.data[tagmac]["battery"] = batteryMv - self.data[tagmac]["lqi"] = LQI - self.data[tagmac]["hwtype"] = hwType - self.data[tagmac]["hwstring"] = get_hw_string(hwType) - self.data[tagmac]["contentmode"] = contentMode - self.data[tagmac]["lastseen"] = lastseen - self.data[tagmac]["nextupdate"] = nextupdate - self.data[tagmac]["nextcheckin"] = nextcheckin - self.data[tagmac]["pending"] = pending - self.data[tagmac]["wakeupReason"] = wakeupReason - self.data[tagmac]["capabilities"] = capabilities - self.data[tagmac]["external"] = isexternal - self.data[tagmac]["alias"] = alias - self.data[tagmac]["hashv"] = hashv - self.data[tagmac]["modecfgjson"] = modecfgjson - self.data[tagmac]["rotate"] = rotate - self.data[tagmac]["lut"] = lut - self.data[tagmac]["ch"] = ch - self.data[tagmac]["ver"] = ver - self.data[tagmac]["tagname"] = tagname - #maintains a list of all tags, new entities should be generated here + + # Handle tag data asynchronously + asyncio.run_coroutine_threadsafe( + self._handle_tag_data(tagmac, tag), + self.eventloop + ) + + # Handle tag registration if needed if tagmac not in self.esls: self.esls.append(tagmac) - loop = self.eventloop - asyncio.run_coroutine_threadsafe(self.reloadcfgett(),loop) - #fire event with the wakeup reason - lut = {0: "TIMED",1: "BOOT",2: "GPIO",3: "NFC",4: "BUTTON1",5: "BUTTON2",252: "FIRSTBOOT",253: "NETWORK_SCAN",254: "WDT_RESET"} + asyncio.run_coroutine_threadsafe( + self.reloadcfgett(), + self.eventloop + ) + + # Fire wakeup event + lut = {0: "TIMED", 1: "BOOT", 2: "GPIO", 3: "NFC", 4: "BUTTON1", 5: "BUTTON2", + 252: "FIRSTBOOT", 253: "NETWORK_SCAN", 254: "WDT_RESET"} event_data = { "device_id": tagmac, - "type": lut[wakeupReason], + "type": lut[tag.get('wakeupReason')], } self._hass.bus.fire(DOMAIN + "_event", event_data) - elif 'errMsg' in data: - errmsg = data.get('errMsg') - elif 'logMsg' in data: - logmsg = data.get('logMsg') - elif 'apitem' in data: - _LOGGER.debug(f"AP item: {data.get('apitem')}") - if not self._is_fetching_config: - self.eventloop.call_soon_threadsafe( - lambda: self.eventloop.create_task(self.fetch_ap_config()) - ) - logmsg = data.get('apitem') - else: - _LOGGER.debug("Unknown msg") - _LOGGER.debug(data) - #log websocket errors + + def _handle_sys_data(self, sys: dict) -> None: + """Handle system data updates.""" + self.data["ap"] = { + "ip": self._host, + "systime": sys.get('currtime'), + "heap": sys.get('heap'), + "recordcount": sys.get('recordcount'), + "dbsize": sys.get('dbsize'), + "littlefsfree": sys.get('littlefsfree'), + "rssi": sys.get('rssi'), + "apstate": sys.get('apstate'), + "runstate": sys.get('runstate'), + "temp": sys.get('temp'), + "wifistatus": sys.get('wifistatus'), + "wifissid": sys.get('wifissid') + } + self._hass.states.set( + DOMAIN + ".ip", + self._host, + { + "icon": "mdi:ip", + "friendly_name": "AP IP", + "should_poll": False + } + ) + + async def _handle_tag_data(self, tagmac: str, tag_data: dict) -> None: + """Handle incoming tag data and manage hardware type verification.""" + hwType = tag_data.get('hwType') + tagname = tag_data.get('alias') or tagmac + + # Store basic tag data regardless of hardware type status + self._store_tag_data(tagmac, tag_data, tagname) + + # Check if we need to handle the hardware type + if not self._tag_manager.is_in_hw_map(hwType): + # Try refreshing if we haven't attempted for this type before + if hwType not in self._attempted_tag_type_refreshes: + state_info = { + "tagmac": tagmac, + "hwType": hwType, + "tagname": tagname, + } + + # Store the pending state + if hwType not in self._pending_states: + self._pending_states[hwType] = [] + self._pending_states[hwType].append(state_info) + + # Attempt refresh + success = await self._try_refresh_hw_type(hwType) + if not success: + return # Warning will have been logged by _try_refresh_hw_type + + # If we have the hardware type definition, set up the entity + if self._tag_manager.is_in_hw_map(hwType): + await self._set_tag_state(tagmac, hwType, tagname) + + # log websocket errors def on_error(self,ws, error) -> None: _LOGGER.debug("Websocket error, most likely on_message crashed") _LOGGER.debug(error) @@ -192,7 +255,7 @@ def on_close(self, ws, close_status_code, close_msg) -> None: def on_open(self,ws) -> None: _LOGGER.debug("WS started") - #starts the websocket + # starts the websocket def connection_thread(self) -> None: while True: try: diff --git a/custom_components/open_epaper_link/hw_map.py b/custom_components/open_epaper_link/hw_map.py deleted file mode 100644 index e5e09ca..0000000 --- a/custom_components/open_epaper_link/hw_map.py +++ /dev/null @@ -1,60 +0,0 @@ -from typing import Dict, Tuple - -HW_MAP: Dict[int, Tuple[str, int, int]] = { - 0: ["M2 1.54\"", 152, 152], - 1: ["M2 2.9\"", 296, 128], - 2: ["M2 4.2\"", 400, 300], - 3: ["M2 2.2\"", 212, 104], - 4: ["M2 2.6\"", 296, 152], - 5: ["M2 7.4\"", 640, 384], - 17: ["M2 2.9\" (NFC)", 296, 128], - 18: ["M2 4.2\" (NFC)", 400, 300], - 26: ["M2 7.4\" (outdated)", 640, 384], - 33: ["M2 2.9\"", 296, 128], - 38: ["M2 7.4 BW\"", 640, 384], - 39: ["M2 2.9 BW\"", 296, 128], - 45: ["M3 12.2\"", 960, 768], - 46: ["M3 9.7\"", 960, 672], - 47: ["M3 4.3\"", 522, 152], - 48: ["M3 1.6\"", 200, 200], - 49: ["M3 2.2\"", 296, 160], - 50: ["M3 2.6\"", 360, 184], - 51: ["M3 2.9\"", 384, 168], - 52: ["M3 4.2\"", 400, 300], - 53: ["M3 6.0\"", 600, 448], - 54: ["M3 7.5\"", 800, 480], - 67: ["M3 1.3\" Peghook", 144, 200], - 69: ["M3 2.2\" Lite", 250, 128], - 70: ["M3 2.2\" BW", 296, 160], - 84: ["HS BW 2.13\"", 256, 128], - 85: ["HS BWR 2.13\"", 256, 128], - 86: ["HS BWR 2.66\"", 296, 152], - 96: ["HS BWY 3.5\"", 384, 184], - 97: ["HS BWR 3.5\"", 384, 184], - 98: ["HS BW 3.5\"", 384, 184], - 102: ["HS BWY 7.5\"", 800, 480], - 103: ["HS BWY 2.0\"", 152, 200], - 128: ["Chroma 7.4\"", 640, 384], - 130: ["Chroma29 2.9\"", 296, 128], - 131: ["Chroma42 4\"", 400, 300], - 176: ["Gicisky BLE EPD BW 2.1\"", 212, 104], - 177: ["Gicisky BLE EPD BWR 2.13\"", 250, 128], - 178: ["Gicisky BLE EPD BW 2.9\"", 296, 128], - 179: ["Gicisky BLE EPD BWR 2.9\"", 296, 128], - 181: ["Gicisky BLE EPD BWR 4.2\"", 400, 300], - 186: ["Gicisky BLE TFT 2.13\"", 250, 132], - 191: ["Gicisky BLE Unknown", 0, 0], - 190: ["ATC MiThermometer BLE", 6, 8], - 224: ["AP display", 320, 170], - 225: ["AP display", 160, 80], - 226: ["LILYGO TPANEL", 480, 480], - 240: ["Segmented", 0, 0] -} -def is_in_hw_map(hw_type: int) -> bool: - return hw_type in HW_MAP - -def get_hw_string(hw_type: int) -> str: - return HW_MAP.get(hw_type, "NOT_IN_HW_MAP")[0] - -def get_hw_dimensions(hw_type: int) -> Tuple[int, int]: - return HW_MAP.get(hw_type)[1:] diff --git a/custom_components/open_epaper_link/icons.json b/custom_components/open_epaper_link/icons.json index 460a94e..989d2c4 100644 --- a/custom_components/open_epaper_link/icons.json +++ b/custom_components/open_epaper_link/icons.json @@ -29,6 +29,9 @@ }, "reboot_ap": { "service": "mdi:restart" + }, + "refresh_tag_types": { + "service": "mdi:refresh" } } } \ No newline at end of file diff --git a/custom_components/open_epaper_link/sensor.py b/custom_components/open_epaper_link/sensor.py index d8716d5..555aa89 100644 --- a/custom_components/open_epaper_link/sensor.py +++ b/custom_components/open_epaper_link/sensor.py @@ -1,4 +1,6 @@ from __future__ import annotations +from typing import Final +from homeassistant.helpers.device_registry import DeviceInfo from .const import DOMAIN import logging import datetime diff --git a/custom_components/open_epaper_link/services.yaml b/custom_components/open_epaper_link/services.yaml index c4a62e8..9823807 100644 --- a/custom_components/open_epaper_link/services.yaml +++ b/custom_components/open_epaper_link/services.yaml @@ -380,4 +380,7 @@ scan_channels: domain: open_epaper_link reboot_ap: name: Reboot AP - description: Reboots the AP \ No newline at end of file + description: Reboots the AP +refresh_tag_types: + name: Refresh Tag Types + description: Force refresh of tag type definitions from GitHub \ No newline at end of file diff --git a/custom_components/open_epaper_link/tag_types.py b/custom_components/open_epaper_link/tag_types.py new file mode 100644 index 0000000..d100379 --- /dev/null +++ b/custom_components/open_epaper_link/tag_types.py @@ -0,0 +1,389 @@ +from __future__ import annotations + +import os.path + +import aiohttp +import asyncio +import json +import logging +from typing import Dict, Tuple, Optional, Any, cast, List +from datetime import datetime, timedelta + +from homeassistant.core import HomeAssistant + +_LOGGER = logging.getLogger(__name__) + +GITHUB_API_URL = "https://api.github.com/repos/OpenEPaperLink/OpenEPaperLink/contents/resources/tagtypes" +GITHUB_RAW_URL = "https://raw.githubusercontent.com/OpenEPaperLink/OpenEPaperLink/master/resources/tagtypes" +CACHE_DURATION = timedelta(hours=48) # Cache tag definitions for 48 hours +STORAGE_VERSION = 1 + +class TagType: + def __init__(self, type_id: int, data: dict): + self.type_id = type_id + self.version = data.get('version', 1) + self.name = data.get('name', f"Unknown Type {type_id}") + self.width = data.get('width', 296) + self.height = data.get('height', 128) + self.rotate_buffer = data.get('rotate_buffer', 0) + self.bpp = data.get('bpp', 2) + self.color_table = data.get('colortable', { + 'white': [255, 255, 255], + 'black': [0, 0, 0], + 'red': [255, 0, 0], + }) + self.short_lut = data.get('shortlut', 2) + self.options = data.get('options', []) + self.content_ids = data.get('contentids', []) + self.template = data.get('template', {}) + self.use_template = data.get('usetemplate', None) + self.zlib_compression = data.get('zlib_compression', None) + self._raw_data = data + + + def to_dict(self) -> dict: + return { + 'version': self.version, + 'name': self.name, + 'width': self.width, + 'height': self.height, + 'rotate_buffer': self.rotate_buffer, + 'bpp': self.bpp, + 'colortable': self.color_table, + 'shortlut': self.short_lut, + 'options': list(self.options), + 'contentids': list(self.content_ids), + 'template': self.template, + 'usetemplate': self.use_template, + 'zlib_compression': self.zlib_compression, + } + + @classmethod + def from_dict(cls, data: dict) -> TagType: + """Create TagType from stored dictionary.""" + raw_data = { + 'version': data.get('version', 1), + 'name': data.get('name'), + 'width': data.get('width'), + 'height': data.get('height'), + 'rotatebuffer': data.get('rotate_buffer'), + 'bpp': data.get('bits_per_pixel'), + 'shortlut': data.get('short_lut'), + 'colortable': data.get('color_table'), + 'options': data.get('options', []), + 'contentids': data.get('content_ids', []), + 'template': data.get('template', {}), + } + return cls(data.get('type_id'), raw_data) + + def get(self, attr: str, default: Any = None) -> Any: + """Get attribute value, supporting dict-like access.""" + return getattr(self, attr, default) + +class TagTypesManager: + """Manages tag type definitions fetched from GitHub.""" + + def __init__(self, hass: HomeAssistant) -> None: + self._hass = hass + self._tag_types: Dict[int, TagType] = {} + self._last_update: Optional[datetime] = None + self._lock = asyncio.Lock() + self._storage_file = self._hass.config.path("open_epaper_link_tagtypes.json") + _LOGGER.debug("TagTypesManager instance created") + + async def load_stored_data(self) -> None: + try: + _LOGGER.debug("Attempting to load stored tag types from %s", self._storage_file) + + def load_json(): + """Load JSON file if it exists.""" + if os.path.exists(self._storage_file): + with open(self._storage_file, 'r', encoding='utf-8') as fp: + return json.load(fp) + return None + + stored_data = await self._hass.async_add_executor_job(load_json) + + if stored_data and stored_data.get('version') == STORAGE_VERSION: + _LOGGER.info("Found valid stored tag types data") + self._last_update = datetime.fromisoformat(stored_data.get('last_update')) + + # Convert stored data back to TagType objects + self._tag_types = {} + for type_id_str, type_data in stored_data.get('tag_types', {}).items(): + try: + type_id = int(type_id_str) + self._tag_types[type_id] = TagType.from_dict(type_data) + _LOGGER.debug("Loaded tag type %d: %s", + type_id, self._tag_types[type_id].name) + except Exception as e: + _LOGGER.error("Error loading tag type %s: %s", type_id_str, str(e)) + + _LOGGER.info("Loaded %d tag types from storage", len(self._tag_types)) + return + else: + _LOGGER.warning("Stored data invalid or wrong version, will fetch from GitHub") + + except Exception as e: + _LOGGER.error("Error loading stored tag types: %s", str(e), exc_info=True) + + # If we get here, either no stored data or invalid data + await self._fetch_tag_types() + + + async def save_to_storage(self) -> None: + """Save tag types to storage.""" + + try: + _LOGGER.debug("Saving tag types to storage") + data = { + 'version': STORAGE_VERSION, + 'last_update': self._last_update.isoformat(), + 'tag_types': { + str(type_id): tag_type.to_dict() + for type_id, tag_type in self._tag_types.items() + } + } + + def write_json(): + temp_file = f"{self._storage_file}.temp" + try: + with open(temp_file, 'w', encoding='utf-8') as file: + json.dump(data, file, default=str, indent=2) + os.replace(temp_file, self._storage_file) + except Exception as e: + _LOGGER.error(f"Error writing tag types to storage: {str(e)}") + if os.path.exists(temp_file): + os.remove(temp_file) + raise e + + await self._hass.async_add_executor_job(write_json) + _LOGGER.debug("Tag types saved to storage") + except Exception as e: + _LOGGER.error(f"Error saving tag types to storage: {str(e)}") + + + + async def ensure_types_loaded(self) -> None: + """Ensure tag types are loaded and not too old.""" + async with self._lock: + _LOGGER.debug(f"Last update: {self._last_update}, {datetime.now()}") + if not self._tag_types: + await self.load_stored_data() + + elif (not self._last_update or + datetime.now() - self._last_update > CACHE_DURATION): + await self._fetch_tag_types() + + async def _fetch_tag_types(self) -> None: + """Fetch tag type definitions from GitHub.""" + try: + async with aiohttp.ClientSession() as session: + # First get the directory listing from GitHub API + headers = {"Accept": "application/vnd.github.v3+json"} + async with session.get(GITHUB_API_URL, headers=headers) as response: + if response.status != 200: + raise Exception(f"GitHub API returned status {response.status}") + + directory_contents = await response.json() + + # Filter for .json files and extract type IDs + type_files = [] + for item in directory_contents: + if item["name"].endswith(".json"): + # Try to extract type ID from filename + try: + base_name = item["name"][:-5] # Remove .json extension + try: + type_id = int(base_name, 16) + _LOGGER.debug(f"Parsed hex type ID {base_name} -> {type_id}") + type_files.append((type_id, item["download_url"])) + continue + except ValueError: + pass + + # If not hex, try decimal + try: + type_id = int(base_name) + _LOGGER.debug(f"Parsed decimal type ID {base_name} -> {type_id}") + type_files.append((type_id, item["download_url"])) + continue + except ValueError: + pass + _LOGGER.warning(f"Could not parse type ID from filename: {item['name']}") + + except Exception as e: + _LOGGER.warning(f"Error processing filename {item['name']}: {str(e)}") + + # Now fetch all found definitions + new_types = {} + for hw_type, url in type_files: + try: + async with session.get(url) as response: + if response.status == 200: + text_content = await response.text() + try: + data = json.loads(text_content) + if self._validate_tag_definition(data): + new_types[hw_type] = TagType(hw_type, data) + _LOGGER.debug(f"Loaded tag type {hw_type}: {data['name']}") + except json.JSONDecodeError: + _LOGGER.error(f"Invalid JSON in tag type {hw_type}") + except Exception as e: + _LOGGER.error(f"Error loading tag type {hw_type}: {str(e)}") + + if new_types: + self._tag_types = new_types + self._last_update = datetime.now() + _LOGGER.info(f"Successfully loaded {len(new_types)} tag definitions") + await self.save_to_storage() + else: + _LOGGER.error("No valid tag definitions found") + + except Exception as e: + _LOGGER.error(f"Error fetching tag types: {str(e)}") + if not self._tag_types: + # Load built-in fallback for first-time failures + self._load_fallback_types() + await self.save_to_storage() + + def _validate_tag_definition(self, data: Dict) -> bool: + """Validate that a tag definition has required fields.""" + required_fields = {'version', 'name', 'width', 'height'} + return all(field in data for field in required_fields) + + def _load_fallback_types(self) -> None: + """Load basic fallback definitions if fetching fails on first run.""" + self._tag_types = { + 0: {"name": "M2 1.54\"", "width": 152, "height": 152}, + 1: {"name": "M2 2.9\"", "width": 296, "height": 128}, + 2: {"name": "M2 4.2\"", "width": 400, "height": 300}, + 224: {"name": "AP display", "width": 320, "height": 170}, + 225: {"name": "AP display", "width": 160, "height": 80}, + 226: {"name": "LILYGO TPANEL", "width": 480, "height": 480}, + 240: {"name": "Segmented", "width": 0, "height": 0} + } + _LOGGER.warning("Loaded fallback tag definitions") + + async def get_tag_info(self, hw_type: int) -> TagType: + """Get tag information (name, width, height) for a given hardware type.""" + await self.ensure_types_loaded() + tag_def = self._tag_types[hw_type] + return tag_def + + def get_hw_dimensions(self, hw_type: int) -> Tuple[int, int]: + """Get width and height for a hardware type.""" + if hw_type not in self._tag_types: + return 296, 128 # Safe defaults + return self._tag_types[hw_type].width, self._tag_types[hw_type].height + + def get_hw_string(self, hw_type: int) -> str: + """Get the display name for a hardware type.""" + if hw_type not in self._tag_types: + return f"Unknown Type {hw_type}" + return self._tag_types[hw_type].get('name', f'Unknown Type {hw_type}') + + def get_rotate_buffer(self, hw_type: int) -> int: + """Get the rotate buffer for a hardware type.""" + if hw_type not in self._tag_types: + return 0 + return self._tag_types[hw_type].get('rotate_buffer') + + def get_bpp(self, hw_type: int) -> int: + """Get the bits per pixel for a hardware type.""" + if hw_type not in self._tag_types: + return 2 + return self._tag_types[hw_type].get('bpp') + + def get_color_table(self, hw_type: int) -> Dict[str, Tuple[int, int, int]]: + """Get the color table for a hardware type.""" + if hw_type not in self._tag_types: + return { + 'white': (255, 255, 255), + 'black': (0, 0, 0), + 'red': (255, 0, 0), + } + return self._tag_types[hw_type].get('color_table') + + def get_short_lut(self, hw_type: int) -> int: + """Get the short LUT setting for a hardware type.""" + if hw_type not in self._tag_types: + return 2 + return self._tag_types[hw_type].get('short_lut') + + def get_options(self, hw_type: int) -> List[str]: + """Get the options for a hardware type.""" + if hw_type not in self._tag_types: + return [] + return self._tag_types[hw_type].get('options', []) + + def is_in_hw_map(self, hw_type: int) -> bool: + """Check if a hardware type is known.""" + return hw_type in self._tag_types + + def get_all_types(self) -> Dict[int, TagType]: + """Return all known tag types.""" + return self._tag_types.copy() + +# Update the helper functions to be synchronous after initial load +_INSTANCE: Optional[TagTypesManager] = None + +async def get_tag_types_manager(hass: HomeAssistant) -> TagTypesManager: + """Get or create the global TagTypesManager instance.""" + global _INSTANCE + if _INSTANCE is None: + _INSTANCE = TagTypesManager(hass) + await _INSTANCE.ensure_types_loaded() + return _INSTANCE + +def get_hw_dimensions(hw_type: int) -> Tuple[int, int]: + """Get dimensions synchronously.""" + if _INSTANCE is None: + return 296, 128 # Default dimensions + return _INSTANCE.get_hw_dimensions(hw_type) + +def get_hw_string(hw_type: int) -> str: + """Get display name synchronously.""" + if _INSTANCE is None: + return f"Unknown Type {hw_type}" + return _INSTANCE.get_hw_string(hw_type) + +def get_rotate_buffer(hw_type: int) -> int: + """Get rotate buffer synchronously.""" + if _INSTANCE is None: + return 0 + return _INSTANCE.get_rotate_buffer(hw_type) + +def get_bpp(hw_type: int) -> int: + """Get bits per pixel synchronously.""" + if _INSTANCE is None: + return 2 + return _INSTANCE.get_bpp(hw_type) + +def get_color_table(hw_type: int) -> Dict[str, Tuple[int, int, int]]: + """Get color table synchronously.""" + if _INSTANCE is None: + return { + 'white': (255, 255, 255), + 'black': (0, 0, 0), + 'red': (255, 0, 0), + } + return _INSTANCE.get_color_table(hw_type) + +def get_short_lut(hw_type: int) -> int: + """Get short LUT synchronously.""" + if _INSTANCE is None: + return 2 + return _INSTANCE.get_short_lut(hw_type) + +def get_options(hw_type: int) -> List[str]: + """Get options synchronously.""" + if _INSTANCE is None: + return [] + return _INSTANCE.get_options(hw_type) + +def is_in_hw_map(hw_type: int) -> bool: + """Check if hardware type exists synchronously.""" + if _INSTANCE is None: + return False + return _INSTANCE.is_in_hw_map(hw_type)