diff --git a/custom_components/ohme/__init__.py b/custom_components/ohme/__init__.py index 10f6fa5..f21b8db 100644 --- a/custom_components/ohme/__init__.py +++ b/custom_components/ohme/__init__.py @@ -4,11 +4,17 @@ from .const import * from .utils import get_option from ohme import OhmeApiClient -from .coordinator import OhmeChargeSessionsCoordinator, OhmeAccountInfoCoordinator, OhmeAdvancedSettingsCoordinator, OhmeChargeSchedulesCoordinator +from .coordinator import ( + OhmeChargeSessionsCoordinator, + OhmeAccountInfoCoordinator, + OhmeAdvancedSettingsCoordinator, + OhmeChargeSchedulesCoordinator, +) from homeassistant.exceptions import ConfigEntryNotReady _LOGGER = logging.getLogger(__name__) + async def async_setup(hass: core.HomeAssistant, config: dict) -> bool: """Set up the Ohme EV Charger component.""" return True @@ -16,8 +22,8 @@ async def async_setup(hass: core.HomeAssistant, config: dict) -> bool: async def async_setup_dependencies(hass, entry): """Instantiate client and refresh session""" - client = OhmeApiClient(entry.data['email'], entry.data['password']) - account_id = entry.data['email'] + client = OhmeApiClient(entry.data["email"], entry.data["password"]) + account_id = entry.data["email"] hass.data[DOMAIN][account_id][DATA_CLIENT] = client @@ -29,19 +35,19 @@ async def async_setup_dependencies(hass, entry): async def async_update_listener(hass, entry): """Handle options flow credentials update.""" - + # Reload this instance await hass.config_entries.async_reload(entry.entry_id) async def async_setup_entry(hass, entry): """This is called from the config flow.""" - + def _update_unique_id(entry: RegistryEntry) -> dict[str, str] | None: """Update unique IDs from old format.""" if entry.unique_id.startswith("ohme_"): - parts = entry.unique_id.split('_') - legacy_id = '_'.join(parts[2:]) + parts = entry.unique_id.split("_") + legacy_id = "_".join(parts[2:]) if legacy_id in LEGACY_MAPPING: new_id = LEGACY_MAPPING[legacy_id] @@ -55,24 +61,30 @@ def _update_unique_id(entry: RegistryEntry) -> dict[str, str] | None: await async_migrate_entries(hass, entry.entry_id, _update_unique_id) - account_id = entry.data['email'] + account_id = entry.data["email"] - hass.data.setdefault(DOMAIN, {}) + hass.data.setdefault(DOMAIN, {}) hass.data[DOMAIN].setdefault(account_id, {}) await async_setup_dependencies(hass, entry) coordinators = [ - OhmeChargeSessionsCoordinator(hass=hass, account_id=account_id), # COORDINATOR_CHARGESESSIONS - OhmeAccountInfoCoordinator(hass=hass, account_id=account_id), # COORDINATOR_ACCOUNTINFO - OhmeAdvancedSettingsCoordinator(hass=hass, account_id=account_id), # COORDINATOR_ADVANCED - OhmeChargeSchedulesCoordinator(hass=hass, account_id=account_id) # COORDINATOR_SCHEDULES + OhmeChargeSessionsCoordinator( + hass=hass, account_id=account_id + ), # COORDINATOR_CHARGESESSIONS + OhmeAccountInfoCoordinator( + hass=hass, account_id=account_id + ), # COORDINATOR_ACCOUNTINFO + OhmeAdvancedSettingsCoordinator( + hass=hass, account_id=account_id + ), # COORDINATOR_ADVANCED + OhmeChargeSchedulesCoordinator( + hass=hass, account_id=account_id + ), # COORDINATOR_SCHEDULES ] # We can function without these so setup can continue - coordinators_optional = [ - OhmeAdvancedSettingsCoordinator - ] + coordinators_optional = [OhmeAdvancedSettingsCoordinator] for coordinator in coordinators: # Catch failures if this is an 'optional' coordinator @@ -81,10 +93,14 @@ def _update_unique_id(entry: RegistryEntry) -> dict[str, str] | None: except ConfigEntryNotReady as ex: allow_failure = False for optional in coordinators_optional: - allow_failure = True if isinstance(coordinator, optional) else allow_failure + allow_failure = ( + True if isinstance(coordinator, optional) else allow_failure + ) if allow_failure: - _LOGGER.error(f"{coordinator.__class__.__name__} failed to setup. This coordinator is optional so the integration will still function, but please raise an issue if this persists.") + _LOGGER.error( + f"{coordinator.__class__.__name__} failed to setup. This coordinator is optional so the integration will still function, but please raise an issue if this persists." + ) else: raise ex @@ -108,9 +124,11 @@ async def async_migrate_entry(hass: core.HomeAssistant, config_entry) -> bool: """Migrate old entry.""" # Version number has gone backwards if CONFIG_VERSION < config_entry.version: - _LOGGER.error("Backwards migration not possible. Please update the integration.") + _LOGGER.error( + "Backwards migration not possible. Please update the integration." + ) return False - + # Version number has gone up if config_entry.version < CONFIG_VERSION: _LOGGER.debug("Migrating from version %s", config_entry.version) diff --git a/custom_components/ohme/base.py b/custom_components/ohme/base.py index 405794e..21523ed 100644 --- a/custom_components/ohme/base.py +++ b/custom_components/ohme/base.py @@ -1,6 +1,7 @@ from homeassistant.helpers.entity import DeviceInfo, Entity from homeassistant.core import callback + class OhmeEntity(Entity): """Base class for all Ohme entities.""" @@ -22,11 +23,9 @@ async def async_added_to_hass(self) -> None: """When entity is added to hass.""" await super().async_added_to_hass() self.async_on_remove( - self.coordinator.async_add_listener( - self._handle_coordinator_update, None - ) + self.coordinator.async_add_listener(self._handle_coordinator_update, None) ) - + @callback def _handle_coordinator_update(self) -> None: self.async_write_ha_state() diff --git a/custom_components/ohme/binary_sensor.py b/custom_components/ohme/binary_sensor.py index 14ad93d..183790c 100644 --- a/custom_components/ohme/binary_sensor.py +++ b/custom_components/ohme/binary_sensor.py @@ -1,43 +1,56 @@ """Platform for sensor integration.""" + from __future__ import annotations import logging from homeassistant.components.binary_sensor import ( BinarySensorDeviceClass, - BinarySensorEntity + BinarySensorEntity, ) from homeassistant.helpers.update_coordinator import CoordinatorEntity from homeassistant.core import HomeAssistant, callback from homeassistant.helpers.entity import generate_entity_id -from homeassistant.util.dt import (utcnow) -from .const import DOMAIN, DATA_COORDINATORS, DATA_SLOTS, COORDINATOR_CHARGESESSIONS, COORDINATOR_ADVANCED, DATA_CLIENT +from homeassistant.util.dt import utcnow +from .const import ( + DOMAIN, + DATA_COORDINATORS, + DATA_SLOTS, + COORDINATOR_CHARGESESSIONS, + COORDINATOR_ADVANCED, + DATA_CLIENT, +) from .utils import in_slot from .base import OhmeEntity _LOGGER = logging.getLogger(__name__) + async def async_setup_entry( hass: core.HomeAssistant, config_entry: config_entries.ConfigEntry, async_add_entities, ): """Setup sensors and configure coordinator.""" - account_id = config_entry.data['email'] + account_id = config_entry.data["email"] client = hass.data[DOMAIN][account_id][DATA_CLIENT] - coordinator = hass.data[DOMAIN][account_id][DATA_COORDINATORS][COORDINATOR_CHARGESESSIONS] - coordinator_advanced = hass.data[DOMAIN][account_id][DATA_COORDINATORS][COORDINATOR_ADVANCED] - - sensors = [ConnectedBinarySensor(coordinator, hass, client), - ChargingBinarySensor(coordinator, hass, client), - PendingApprovalBinarySensor(coordinator, hass, client), - CurrentSlotBinarySensor(coordinator, hass, client), - ChargerOnlineBinarySensor(coordinator_advanced, hass, client)] + coordinator = hass.data[DOMAIN][account_id][DATA_COORDINATORS][ + COORDINATOR_CHARGESESSIONS + ] + coordinator_advanced = hass.data[DOMAIN][account_id][DATA_COORDINATORS][ + COORDINATOR_ADVANCED + ] + + sensors = [ + ConnectedBinarySensor(coordinator, hass, client), + ChargingBinarySensor(coordinator, hass, client), + PendingApprovalBinarySensor(coordinator, hass, client), + CurrentSlotBinarySensor(coordinator, hass, client), + ChargerOnlineBinarySensor(coordinator_advanced, hass, client), + ] async_add_entities(sensors, update_before_add=True) -class ConnectedBinarySensor( - OhmeEntity, - BinarySensorEntity): +class ConnectedBinarySensor(OhmeEntity, BinarySensorEntity): """Binary sensor for if car is plugged in.""" _attr_translation_key = "car_connected" @@ -54,9 +67,7 @@ def is_on(self) -> bool: return self._state -class ChargingBinarySensor( - OhmeEntity, - BinarySensorEntity): +class ChargingBinarySensor(OhmeEntity, BinarySensorEntity): """Binary sensor for if car is charging.""" _attr_translation_key = "car_charging" @@ -64,10 +75,8 @@ class ChargingBinarySensor( _attr_device_class = BinarySensorDeviceClass.BATTERY_CHARGING def __init__( - self, - coordinator: OhmeChargeSessionsCoordinator, - hass: HomeAssistant, - client): + self, coordinator: OhmeChargeSessionsCoordinator, hass: HomeAssistant, client + ): super().__init__(coordinator, hass, client) # Cache the last power readings @@ -86,10 +95,16 @@ def _calculate_state(self) -> bool: power = self.coordinator.data["power"]["watt"] # If no last reading or no batterySoc/power, fallback to power > 0 - if not self._last_reading or not self._last_reading['batterySoc'] or not self._last_reading['power']: - _LOGGER.debug("ChargingBinarySensor: No last reading, defaulting to power > 0") + if ( + not self._last_reading + or not self._last_reading["batterySoc"] + or not self._last_reading["power"] + ): + _LOGGER.debug( + "ChargingBinarySensor: No last reading, defaulting to power > 0" + ) return power > 0 - + # See if we are in a charge slot now and if we were for the last reading in_charge_slot = in_slot(self.coordinator.data) lr_in_charge_slot = self._last_reading_in_slot @@ -103,40 +118,58 @@ def _calculate_state(self) -> bool: # The charge has JUST stopped on the session bounary but the power reading is lagging. # This condition makes sure we get the charge state updated on the tick immediately after charge stop. lr_power = self._last_reading["power"]["watt"] - if lr_in_charge_slot and not in_charge_slot and lr_power > 0 and power / lr_power < 0.6: - _LOGGER.debug("ChargingBinarySensor: Power drop on state boundary, assuming not charging") + if ( + lr_in_charge_slot + and not in_charge_slot + and lr_power > 0 + and power / lr_power < 0.6 + ): + _LOGGER.debug( + "ChargingBinarySensor: Power drop on state boundary, assuming not charging" + ) self._trigger_count = 0 return False - + # Failing that, we use the watt hours field to check charge state: # - If Wh has positive delta # - We have a nonzero power reading # We are charging. Using the power reading isn't ideal - eg. quirk of MG ZS in #13, so need to revisit - wh_delta = self.coordinator.data['batterySoc']['wh'] - self._last_reading['batterySoc']['wh'] + wh_delta = ( + self.coordinator.data["batterySoc"]["wh"] + - self._last_reading["batterySoc"]["wh"] + ) trigger_state = wh_delta > 0 and power > 0 - _LOGGER.debug(f"ChargingBinarySensor: Reading Wh delta of {wh_delta} and power of {power}w") + _LOGGER.debug( + f"ChargingBinarySensor: Reading Wh delta of {wh_delta} and power of {power}w" + ) # If state is going upwards, report straight away if trigger_state and not self._state: - _LOGGER.debug("ChargingBinarySensor: Upwards state change, reporting immediately") + _LOGGER.debug( + "ChargingBinarySensor: Upwards state change, reporting immediately" + ) self._trigger_count = 0 return True # If state is going to change (downwards only for now), we want to see 3 consecutive readings of the state having # changed before reporting it. if self._state != trigger_state: - _LOGGER.debug("ChargingBinarySensor: Downwards state change, incrementing counter") + _LOGGER.debug( + "ChargingBinarySensor: Downwards state change, incrementing counter" + ) self._trigger_count += 1 if self._trigger_count > 2: - _LOGGER.debug("ChargingBinarySensor: Counter hit, publishing downward state change") + _LOGGER.debug( + "ChargingBinarySensor: Counter hit, publishing downward state change" + ) self._trigger_count = 0 return trigger_state else: self._trigger_count = 0 _LOGGER.debug("ChargingBinarySensor: Returning existing state") - + # State hasn't changed or we haven't seen 3 changed values - return existing state return self._state @@ -145,16 +178,24 @@ def _handle_coordinator_update(self) -> None: """Update data.""" # Don't accept updates if 5s hasnt passed # State calculations use deltas that may be unreliable to check if requests are too often - if self._last_updated and (utcnow().timestamp() - self._last_updated.timestamp() < 5): + if self._last_updated and ( + utcnow().timestamp() - self._last_updated.timestamp() < 5 + ): _LOGGER.debug("ChargingBinarySensor: State update too soon - suppressing") return # If we have power info and the car is plugged in, calculate state. Otherwise, false - if self.coordinator.data and self.coordinator.data["power"] and self.coordinator.data['mode'] != "DISCONNECTED": + if ( + self.coordinator.data + and self.coordinator.data["power"] + and self.coordinator.data["mode"] != "DISCONNECTED" + ): self._state = self._calculate_state() else: self._state = False - _LOGGER.debug("ChargingBinarySensor: No power data or car disconnected - reporting False") + _LOGGER.debug( + "ChargingBinarySensor: No power data or car disconnected - reporting False" + ) self._last_reading = self.coordinator.data self._last_updated = utcnow() @@ -162,9 +203,7 @@ def _handle_coordinator_update(self) -> None: self.async_write_ha_state() -class PendingApprovalBinarySensor( - OhmeEntity, - BinarySensorEntity): +class PendingApprovalBinarySensor(OhmeEntity, BinarySensorEntity): """Binary sensor for if a charge is pending approval.""" _attr_translation_key = "pending_approval" @@ -175,15 +214,12 @@ def is_on(self) -> bool: if self.coordinator.data is None: self._state = False else: - self._state = bool( - self.coordinator.data["mode"] == "PENDING_APPROVAL") + self._state = bool(self.coordinator.data["mode"] == "PENDING_APPROVAL") return self._state -class CurrentSlotBinarySensor( - OhmeEntity, - BinarySensorEntity): +class CurrentSlotBinarySensor(OhmeEntity, BinarySensorEntity): """Binary sensor for if we are currently in a smart charge slot.""" _attr_translation_key = "slot_active" @@ -193,11 +229,15 @@ class CurrentSlotBinarySensor( def extra_state_attributes(self): """Attributes of the sensor.""" now = utcnow() - slots = self._hass.data[DOMAIN][self._client.email][DATA_SLOTS] if DATA_SLOTS in self._hass.data[DOMAIN][self._client.email] else [] + slots = ( + self._hass.data[DOMAIN][self._client.email][DATA_SLOTS] + if DATA_SLOTS in self._hass.data[DOMAIN][self._client.email] + else [] + ) return { - "planned_dispatches": [x for x in slots if not x['end'] or x['end'] > now], - "completed_dispatches": [x for x in slots if x['end'] < now] + "planned_dispatches": [x for x in slots if not x["end"] or x["end"] > now], + "completed_dispatches": [x for x in slots if x["end"] < now], } @property @@ -218,9 +258,8 @@ def _handle_coordinator_update(self) -> None: self.async_write_ha_state() -class ChargerOnlineBinarySensor( - OhmeEntity, - BinarySensorEntity): + +class ChargerOnlineBinarySensor(OhmeEntity, BinarySensorEntity): """Binary sensor for if charger is online.""" _attr_translation_key = "charger_online" diff --git a/custom_components/ohme/button.py b/custom_components/ohme/button.py index b815be4..789a795 100644 --- a/custom_components/ohme/button.py +++ b/custom_components/ohme/button.py @@ -13,28 +13,27 @@ async def async_setup_entry( - hass: HomeAssistant, - config_entry: config_entries.ConfigEntry, - async_add_entities + hass: HomeAssistant, config_entry: config_entries.ConfigEntry, async_add_entities ): """Setup switches.""" - account_id = config_entry.data['email'] + account_id = config_entry.data["email"] client = hass.data[DOMAIN][account_id][DATA_CLIENT] - coordinator = hass.data[DOMAIN][account_id][DATA_COORDINATORS][COORDINATOR_CHARGESESSIONS] + coordinator = hass.data[DOMAIN][account_id][DATA_COORDINATORS][ + COORDINATOR_CHARGESESSIONS + ] buttons = [] if client.is_capable("pluginsRequireApprovalMode"): - buttons.append( - OhmeApproveChargeButton(coordinator, hass, client) - ) + buttons.append(OhmeApproveChargeButton(coordinator, hass, client)) async_add_entities(buttons, update_before_add=True) class OhmeApproveChargeButton(OhmeEntity, ButtonEntity): """Button for approving a charge.""" + _attr_translation_key = "approve_charge" _attr_icon = "mdi:check-decagram-outline" diff --git a/custom_components/ohme/config_flow.py b/custom_components/ohme/config_flow.py index 82b94bc..347d5cf 100644 --- a/custom_components/ohme/config_flow.py +++ b/custom_components/ohme/config_flow.py @@ -1,33 +1,35 @@ import voluptuous as vol -from homeassistant.config_entries import (ConfigFlow, OptionsFlow) -from .const import DOMAIN, CONFIG_VERSION, DEFAULT_INTERVAL_CHARGESESSIONS, DEFAULT_INTERVAL_ACCOUNTINFO, DEFAULT_INTERVAL_ADVANCED, DEFAULT_INTERVAL_SCHEDULES +from homeassistant.config_entries import ConfigFlow, OptionsFlow +from .const import ( + DOMAIN, + CONFIG_VERSION, + DEFAULT_INTERVAL_CHARGESESSIONS, + DEFAULT_INTERVAL_ACCOUNTINFO, + DEFAULT_INTERVAL_ADVANCED, + DEFAULT_INTERVAL_SCHEDULES, +) from ohme import OhmeApiClient -USER_SCHEMA = vol.Schema({ - vol.Required("email"): str, - vol.Required("password"): str -}) +USER_SCHEMA = vol.Schema({vol.Required("email"): str, vol.Required("password"): str}) class OhmeConfigFlow(ConfigFlow, domain=DOMAIN): """Config flow.""" + VERSION = CONFIG_VERSION async def async_step_user(self, info): errors = {} if info is not None: - await self.async_set_unique_id(info['email']) + await self.async_set_unique_id(info["email"]) self._abort_if_unique_id_configured() - instance = OhmeApiClient(info['email'], info['password']) + instance = OhmeApiClient(info["email"], info["password"]) if await instance.async_refresh_session() is None: errors["base"] = "auth_error" else: - return self.async_create_entry( - title=info['email'], - data=info - ) + return self.async_create_entry(title=info["email"], data=info) return self.async_show_form( step_id="user", data_schema=USER_SCHEMA, errors=errors @@ -50,19 +52,19 @@ async def async_step_init(self, options): data = self._config_entry.data # Update credentials - if 'email' in options and 'password' in options: - instance = OhmeApiClient(options['email'], options['password']) + if "email" in options and "password" in options: + instance = OhmeApiClient(options["email"], options["password"]) if await instance.async_refresh_session() is None: errors["base"] = "auth_error" else: - data['email'] = options['email'] - data['password'] = options['password'] + data["email"] = options["email"] + data["password"] = options["password"] # If we have no errors, update the data array if len(errors) == 0: # Don't store email and password in options - options.pop('email', None) - options.pop('password', None) + options.pop("email", None) + options.pop("password", None) # Update data self.hass.config_entries.async_update_entry( @@ -70,37 +72,62 @@ async def async_step_init(self, options): ) # Update options - return self.async_create_entry( - title="", - data=options - ) + return self.async_create_entry(title="", data=options) return self.async_show_form( - step_id="init", data_schema=vol.Schema( - { - vol.Required( - "email", default=self._config_entry.data['email'] - ): str, - vol.Optional( - "password" - ): str, - vol.Required( - "never_session_specific", default=self._config_entry.options.get("never_session_specific", False) - ) : bool, - vol.Required( - "never_collapse_slots", default=self._config_entry.options.get("never_collapse_slots", False) - ) : bool, - vol.Required( - "interval_chargesessions", default=self._config_entry.options.get("interval_chargesessions", DEFAULT_INTERVAL_CHARGESESSIONS) - ) : vol.All(vol.Coerce(float), vol.Clamp(min=DEFAULT_INTERVAL_CHARGESESSIONS)), - vol.Required( - "interval_accountinfo", default=self._config_entry.options.get("interval_accountinfo", DEFAULT_INTERVAL_ACCOUNTINFO) - ) : vol.All(vol.Coerce(float), vol.Clamp(min=DEFAULT_INTERVAL_ACCOUNTINFO)), - vol.Required( - "interval_advanced", default=self._config_entry.options.get("interval_advanced", DEFAULT_INTERVAL_ADVANCED) - ) : vol.All(vol.Coerce(float), vol.Clamp(min=DEFAULT_INTERVAL_ADVANCED)), - vol.Required( - "interval_schedules", default=self._config_entry.options.get("interval_schedules", DEFAULT_INTERVAL_SCHEDULES) - ) : vol.All(vol.Coerce(float), vol.Clamp(min=DEFAULT_INTERVAL_SCHEDULES)) - }), errors=errors + step_id="init", + data_schema=vol.Schema( + { + vol.Required( + "email", default=self._config_entry.data["email"] + ): str, + vol.Optional("password"): str, + vol.Required( + "never_session_specific", + default=self._config_entry.options.get( + "never_session_specific", False + ), + ): bool, + vol.Required( + "never_collapse_slots", + default=self._config_entry.options.get( + "never_collapse_slots", False + ), + ): bool, + vol.Required( + "interval_chargesessions", + default=self._config_entry.options.get( + "interval_chargesessions", DEFAULT_INTERVAL_CHARGESESSIONS + ), + ): vol.All( + vol.Coerce(float), + vol.Clamp(min=DEFAULT_INTERVAL_CHARGESESSIONS), + ), + vol.Required( + "interval_accountinfo", + default=self._config_entry.options.get( + "interval_accountinfo", DEFAULT_INTERVAL_ACCOUNTINFO + ), + ): vol.All( + vol.Coerce(float), vol.Clamp(min=DEFAULT_INTERVAL_ACCOUNTINFO) + ), + vol.Required( + "interval_advanced", + default=self._config_entry.options.get( + "interval_advanced", DEFAULT_INTERVAL_ADVANCED + ), + ): vol.All( + vol.Coerce(float), vol.Clamp(min=DEFAULT_INTERVAL_ADVANCED) + ), + vol.Required( + "interval_schedules", + default=self._config_entry.options.get( + "interval_schedules", DEFAULT_INTERVAL_SCHEDULES + ), + ): vol.All( + vol.Coerce(float), vol.Clamp(min=DEFAULT_INTERVAL_SCHEDULES) + ), + } + ), + errors=errors, ) diff --git a/custom_components/ohme/const.py b/custom_components/ohme/const.py index 6667ae3..9a715a8 100644 --- a/custom_components/ohme/const.py +++ b/custom_components/ohme/const.py @@ -1,4 +1,5 @@ """Component constants""" + DOMAIN = "ohme" USER_AGENT = "dan-r-homeassistant-ohme" INTEGRATION_VERSION = "1.1.0" @@ -30,5 +31,5 @@ "buttonsLocked": "lock_buttons", "pluginsRequireApproval": "require_approval", "stealthEnabled": "sleep_when_inactive", - "price_cap_enabled": "enable_price_cap" + "price_cap_enabled": "enable_price_cap", } diff --git a/custom_components/ohme/coordinator.py b/custom_components/ohme/coordinator.py index dd0f75e..02e5ea2 100644 --- a/custom_components/ohme/coordinator.py +++ b/custom_components/ohme/coordinator.py @@ -1,12 +1,16 @@ from datetime import timedelta import logging -from homeassistant.helpers.update_coordinator import ( - DataUpdateCoordinator, - UpdateFailed +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed + +from .const import ( + DOMAIN, + DATA_CLIENT, + DEFAULT_INTERVAL_CHARGESESSIONS, + DEFAULT_INTERVAL_ACCOUNTINFO, + DEFAULT_INTERVAL_ADVANCED, + DEFAULT_INTERVAL_SCHEDULES, ) - -from .const import DOMAIN, DATA_CLIENT, DEFAULT_INTERVAL_CHARGESESSIONS, DEFAULT_INTERVAL_ACCOUNTINFO, DEFAULT_INTERVAL_ADVANCED, DEFAULT_INTERVAL_SCHEDULES from .utils import get_option _LOGGER = logging.getLogger(__name__) @@ -21,8 +25,13 @@ def __init__(self, hass, account_id): hass, _LOGGER, name="Ohme Charge Sessions", - update_interval=timedelta(minutes= - get_option(hass, account_id, "interval_chargesessions", DEFAULT_INTERVAL_CHARGESESSIONS) + update_interval=timedelta( + minutes=get_option( + hass, + account_id, + "interval_chargesessions", + DEFAULT_INTERVAL_CHARGESESSIONS, + ) ), ) self._client = hass.data[DOMAIN][account_id][DATA_CLIENT] @@ -45,8 +54,13 @@ def __init__(self, hass, account_id): hass, _LOGGER, name="Ohme Account Info", - update_interval=timedelta(minutes= - get_option(hass, account_id, "interval_accountinfo", DEFAULT_INTERVAL_ACCOUNTINFO) + update_interval=timedelta( + minutes=get_option( + hass, + account_id, + "interval_accountinfo", + DEFAULT_INTERVAL_ACCOUNTINFO, + ) ), ) self._client = hass.data[DOMAIN][account_id][DATA_CLIENT] @@ -69,8 +83,10 @@ def __init__(self, hass, account_id): hass, _LOGGER, name="Ohme Advanced Settings", - update_interval=timedelta(minutes= - get_option(hass, account_id, "interval_advanced", DEFAULT_INTERVAL_ADVANCED) + update_interval=timedelta( + minutes=get_option( + hass, account_id, "interval_advanced", DEFAULT_INTERVAL_ADVANCED + ) ), ) self._client = hass.data[DOMAIN][account_id][DATA_CLIENT] @@ -93,8 +109,10 @@ def __init__(self, hass, account_id): hass, _LOGGER, name="Ohme Charge Schedules", - update_interval=timedelta(minutes= - get_option(hass, account_id, "interval_schedules", DEFAULT_INTERVAL_SCHEDULES) + update_interval=timedelta( + minutes=get_option( + hass, account_id, "interval_schedules", DEFAULT_INTERVAL_SCHEDULES + ) ), ) self._client = hass.data[DOMAIN][account_id][DATA_CLIENT] diff --git a/custom_components/ohme/number.py b/custom_components/ohme/number.py index 333c816..dd2b873 100644 --- a/custom_components/ohme/number.py +++ b/custom_components/ohme/number.py @@ -5,26 +5,41 @@ from homeassistant.const import UnitOfTime from homeassistant.helpers.entity import generate_entity_id from homeassistant.core import callback, HomeAssistant -from .const import DOMAIN, DATA_CLIENT, DATA_COORDINATORS, COORDINATOR_ACCOUNTINFO, COORDINATOR_CHARGESESSIONS, COORDINATOR_SCHEDULES +from .const import ( + DOMAIN, + DATA_CLIENT, + DATA_COORDINATORS, + COORDINATOR_ACCOUNTINFO, + COORDINATOR_CHARGESESSIONS, + COORDINATOR_SCHEDULES, +) from .utils import session_in_progress from .base import OhmeEntity async def async_setup_entry( - hass: HomeAssistant, - config_entry: config_entries.ConfigEntry, - async_add_entities + hass: HomeAssistant, config_entry: config_entries.ConfigEntry, async_add_entities ): """Setup switches and configure coordinator.""" - account_id = config_entry.data['email'] + account_id = config_entry.data["email"] coordinators = hass.data[DOMAIN][account_id][DATA_COORDINATORS] client = hass.data[DOMAIN][account_id][DATA_CLIENT] - numbers = [TargetPercentNumber( - coordinators[COORDINATOR_CHARGESESSIONS], coordinators[COORDINATOR_SCHEDULES], hass, client), + numbers = [ + TargetPercentNumber( + coordinators[COORDINATOR_CHARGESESSIONS], + coordinators[COORDINATOR_SCHEDULES], + hass, + client, + ), PreconditioningNumber( - coordinators[COORDINATOR_CHARGESESSIONS], coordinators[COORDINATOR_SCHEDULES], hass, client)] + coordinators[COORDINATOR_CHARGESESSIONS], + coordinators[COORDINATOR_SCHEDULES], + hass, + client, + ), + ] if client.cap_available(): numbers.append( @@ -36,6 +51,7 @@ async def async_setup_entry( class TargetPercentNumber(OhmeEntity, NumberEntity): """Target percentage sensor.""" + _attr_translation_key = "target_percentage" _attr_icon = "mdi:battery-heart" _attr_device_class = NumberDeviceClass.BATTERY @@ -72,10 +88,9 @@ def _handle_coordinator_update(self) -> None: """Get value from data returned from API by coordinator""" # Set with the same logic as reading if session_in_progress(self.hass, self._client.email, self.coordinator.data): - target = round( - self.coordinator.data['appliedRule']['targetPercent']) + target = round(self.coordinator.data["appliedRule"]["targetPercent"]) elif self.coordinator_schedules.data: - target = round(self.coordinator_schedules.data['targetPercent']) + target = round(self.coordinator_schedules.data["targetPercent"]) self._state = target if target > 0 else None @@ -86,6 +101,7 @@ def native_value(self): class PreconditioningNumber(OhmeEntity, NumberEntity): """Preconditioning sensor.""" + _attr_translation_key = "preconditioning" _attr_icon = "mdi:air-conditioner" _attr_device_class = NumberDeviceClass.DURATION @@ -114,14 +130,18 @@ async def async_set_native_value(self, value: float) -> None: if value == 0: await self._client.async_apply_session_rule(pre_condition=False) else: - await self._client.async_apply_session_rule(pre_condition=True, pre_condition_length=int(value)) + await self._client.async_apply_session_rule( + pre_condition=True, pre_condition_length=int(value) + ) await asyncio.sleep(1) await self.coordinator.async_refresh() else: if value == 0: await self._client.async_update_schedule(pre_condition=False) else: - await self._client.async_update_schedule(pre_condition=True, pre_condition_length=int(value)) + await self._client.async_update_schedule( + pre_condition=True, pre_condition_length=int(value) + ) await asyncio.sleep(1) await self.coordinator_schedules.async_refresh() @@ -131,15 +151,25 @@ def _handle_coordinator_update(self) -> None: precondition = None # Set with the same logic as reading if session_in_progress(self.hass, self._client.email, self.coordinator.data): - enabled = self.coordinator.data['appliedRule'].get( - 'preconditioningEnabled', False) - precondition = 0 if not enabled else self.coordinator.data['appliedRule'].get( - 'preconditionLengthMins', None) + enabled = self.coordinator.data["appliedRule"].get( + "preconditioningEnabled", False + ) + precondition = ( + 0 + if not enabled + else self.coordinator.data["appliedRule"].get( + "preconditionLengthMins", None + ) + ) elif self.coordinator_schedules.data: enabled = self.coordinator_schedules.data.get( - 'preconditioningEnabled', False) - precondition = 0 if not enabled else self.coordinator_schedules.data.get( - 'preconditionLengthMins', None) + "preconditioningEnabled", False + ) + precondition = ( + 0 + if not enabled + else self.coordinator_schedules.data.get("preconditionLengthMins", None) + ) self._state = precondition @@ -169,12 +199,8 @@ def native_unit_of_measurement(self): if self.coordinator.data is None: return None - penny_unit = { - "GBP": "p", - "EUR": "c" - } - currency = self.coordinator.data["userSettings"].get( - "currencyCode", "XXX") + penny_unit = {"GBP": "p", "EUR": "c"} + currency = self.coordinator.data["userSettings"].get("currencyCode", "XXX") return penny_unit.get(currency, f"{currency}/100") @@ -183,7 +209,9 @@ def _handle_coordinator_update(self) -> None: """Get value from data returned from API by coordinator""" if self.coordinator.data is not None: try: - self._state = self.coordinator.data["userSettings"]["chargeSettings"][0]["value"] + self._state = self.coordinator.data["userSettings"]["chargeSettings"][ + 0 + ]["value"] except: self._state = None self.async_write_ha_state() diff --git a/custom_components/ohme/sensor.py b/custom_components/ohme/sensor.py index 7c504e6..694551e 100644 --- a/custom_components/ohme/sensor.py +++ b/custom_components/ohme/sensor.py @@ -1,31 +1,46 @@ """Platform for sensor integration.""" + from __future__ import annotations from homeassistant.components.sensor import ( SensorDeviceClass, SensorStateClass, - SensorEntity + SensorEntity, ) import math import logging from homeassistant.helpers.update_coordinator import CoordinatorEntity -from homeassistant.const import UnitOfPower, UnitOfEnergy, UnitOfElectricCurrent, UnitOfElectricPotential, PERCENTAGE +from homeassistant.const import ( + UnitOfPower, + UnitOfEnergy, + UnitOfElectricCurrent, + UnitOfElectricPotential, + PERCENTAGE, +) from homeassistant.core import HomeAssistant, callback from homeassistant.helpers.entity import generate_entity_id -from homeassistant.util.dt import (utcnow) -from .const import DOMAIN, DATA_CLIENT, DATA_COORDINATORS, DATA_SLOTS, COORDINATOR_CHARGESESSIONS, COORDINATOR_ADVANCED +from homeassistant.util.dt import utcnow +from .const import ( + DOMAIN, + DATA_CLIENT, + DATA_COORDINATORS, + DATA_SLOTS, + COORDINATOR_CHARGESESSIONS, + COORDINATOR_ADVANCED, +) from .coordinator import OhmeChargeSessionsCoordinator, OhmeAdvancedSettingsCoordinator from .utils import next_slot, get_option, slot_list, slot_list_str from .base import OhmeEntity _LOGGER = logging.getLogger(__name__) + async def async_setup_entry( hass: core.HomeAssistant, config_entry: config_entries.ConfigEntry, - async_add_entities + async_add_entities, ): """Setup sensors and configure coordinator.""" - account_id = config_entry.data['email'] + account_id = config_entry.data["email"] client = hass.data[DOMAIN][account_id][DATA_CLIENT] coordinators = hass.data[DOMAIN][account_id][DATA_COORDINATORS] @@ -33,21 +48,24 @@ async def async_setup_entry( coordinator = coordinators[COORDINATOR_CHARGESESSIONS] adv_coordinator = coordinators[COORDINATOR_ADVANCED] - sensors = [PowerDrawSensor(coordinator, hass, client), - CurrentDrawSensor(coordinator, hass, client), - VoltageSensor(coordinator, hass, client), - CTSensor(adv_coordinator, hass, client), - EnergyUsageSensor(coordinator, hass, client), - NextSlotEndSensor(coordinator, hass, client), - NextSlotStartSensor(coordinator, hass, client), - SlotListSensor(coordinator, hass, client), - BatterySOCSensor(coordinator, hass, client)] - + sensors = [ + PowerDrawSensor(coordinator, hass, client), + CurrentDrawSensor(coordinator, hass, client), + VoltageSensor(coordinator, hass, client), + CTSensor(adv_coordinator, hass, client), + EnergyUsageSensor(coordinator, hass, client), + NextSlotEndSensor(coordinator, hass, client), + NextSlotStartSensor(coordinator, hass, client), + SlotListSensor(coordinator, hass, client), + BatterySOCSensor(coordinator, hass, client), + ] + async_add_entities(sensors, update_before_add=True) class PowerDrawSensor(OhmeEntity, SensorEntity): """Sensor for car power draw.""" + _attr_translation_key = "power_draw" _attr_icon = "mdi:ev-station" _attr_native_unit_of_measurement = UnitOfPower.WATT @@ -56,13 +74,14 @@ class PowerDrawSensor(OhmeEntity, SensorEntity): @property def native_value(self): """Get value from data returned from API by coordinator""" - if self.coordinator.data and self.coordinator.data['power']: - return self.coordinator.data['power']['watt'] + if self.coordinator.data and self.coordinator.data["power"]: + return self.coordinator.data["power"]["watt"] return 0 class CurrentDrawSensor(OhmeEntity, SensorEntity): """Sensor for car power draw.""" + _attr_translation_key = "current_draw" _attr_icon = "mdi:current-ac" _attr_device_class = SensorDeviceClass.CURRENT @@ -71,13 +90,14 @@ class CurrentDrawSensor(OhmeEntity, SensorEntity): @property def native_value(self): """Get value from data returned from API by coordinator""" - if self.coordinator.data and self.coordinator.data['power']: - return self.coordinator.data['power']['amp'] + if self.coordinator.data and self.coordinator.data["power"]: + return self.coordinator.data["power"]["amp"] return 0 class VoltageSensor(OhmeEntity, SensorEntity): """Sensor for EVSE voltage.""" + _attr_translation_key = "voltage" _attr_icon = "mdi:sine-wave" _attr_device_class = SensorDeviceClass.VOLTAGE @@ -86,13 +106,14 @@ class VoltageSensor(OhmeEntity, SensorEntity): @property def native_value(self): """Get value from data returned from API by coordinator""" - if self.coordinator.data and self.coordinator.data['power']: - return self.coordinator.data['power']['volt'] + if self.coordinator.data and self.coordinator.data["power"]: + return self.coordinator.data["power"]["volt"] return None class CTSensor(OhmeEntity, SensorEntity): """Sensor for car power draw.""" + _attr_translation_key = "ct_reading" _attr_icon = "mdi:gauge" _attr_device_class = SensorDeviceClass.CURRENT @@ -101,11 +122,12 @@ class CTSensor(OhmeEntity, SensorEntity): @property def native_value(self): """Get value from data returned from API by coordinator""" - return self.coordinator.data['clampAmps'] + return self.coordinator.data["clampAmps"] class EnergyUsageSensor(OhmeEntity, SensorEntity): """Sensor for total energy usage.""" + _attr_translation_key = "energy" _attr_icon = "mdi:lightning-bolt-circle" _attr_has_entity_name = True @@ -118,13 +140,15 @@ class EnergyUsageSensor(OhmeEntity, SensorEntity): @callback def _handle_coordinator_update(self) -> None: # Ensure we have data, then ensure value is going up and above 0 - if self.coordinator.data and self.coordinator.data['batterySoc']: + if self.coordinator.data and self.coordinator.data["batterySoc"]: new_state = 0 try: - new_state = self.coordinator.data['chargeGraph']['now']['y'] + new_state = self.coordinator.data["chargeGraph"]["now"]["y"] except BaseException: - _LOGGER.debug("EnergyUsageSensor: ChargeGraph reading failed, falling back to batterySoc") - new_state = self.coordinator.data['batterySoc']['wh'] + _LOGGER.debug( + "EnergyUsageSensor: ChargeGraph reading failed, falling back to batterySoc" + ) + new_state = self.coordinator.data["batterySoc"]["wh"] # Let the state reset to 0, but not drop otherwise if not new_state or new_state <= 0: @@ -132,11 +156,16 @@ def _handle_coordinator_update(self) -> None: self._state = 0 else: # Allow a significant (90%+) drop, even if we dont hit exactly 0 - if self._state and self._state > 0 and new_state > 0 and (new_state / self._state) < 0.1: + if ( + self._state + and self._state > 0 + and new_state > 0 + and (new_state / self._state) < 0.1 + ): self._state = new_state else: self._state = max(0, self._state or 0, new_state) - + _LOGGER.debug("EnergyUsageSensor: New state is %s", self._state) self.async_write_ha_state() @@ -148,6 +177,7 @@ def native_value(self): class NextSlotStartSensor(OhmeEntity, SensorEntity): """Sensor for next smart charge slot start time.""" + _attr_translation_key = "next_slot_start" _attr_icon = "mdi:clock-star-four-points" _attr_device_class = SensorDeviceClass.TIMESTAMP @@ -160,10 +190,15 @@ def native_value(self): @callback def _handle_coordinator_update(self) -> None: """Calculate next timeslot. This is a bit slow so we only update on coordinator data update.""" - if self.coordinator.data is None or self.coordinator.data["mode"] == "DISCONNECTED": + if ( + self.coordinator.data is None + or self.coordinator.data["mode"] == "DISCONNECTED" + ): self._state = None else: - self._state = next_slot(self._hass, self._client.email, self.coordinator.data)['start'] + self._state = next_slot( + self._hass, self._client.email, self.coordinator.data + )["start"] self._last_updated = utcnow() @@ -172,6 +207,7 @@ def _handle_coordinator_update(self) -> None: class NextSlotEndSensor(OhmeEntity, SensorEntity): """Sensor for next smart charge slot end time.""" + _attr_translation_key = "next_slot_end" _attr_icon = "mdi:clock-star-four-points-outline" _attr_device_class = SensorDeviceClass.TIMESTAMP @@ -184,10 +220,15 @@ def native_value(self): @callback def _handle_coordinator_update(self) -> None: """Calculate next timeslot. This is a bit slow so we only update on coordinator data update.""" - if self.coordinator.data is None or self.coordinator.data["mode"] == "DISCONNECTED": + if ( + self.coordinator.data is None + or self.coordinator.data["mode"] == "DISCONNECTED" + ): self._state = None else: - self._state = next_slot(self._hass, self._client.email, self.coordinator.data)['end'] + self._state = next_slot( + self._hass, self._client.email, self.coordinator.data + )["end"] self._last_updated = utcnow() @@ -196,6 +237,7 @@ def _handle_coordinator_update(self) -> None: class SlotListSensor(OhmeEntity, SensorEntity): """Sensor for next smart charge slot end time.""" + _attr_translation_key = "charge_slots" _attr_icon = "mdi:timetable" @@ -207,23 +249,28 @@ def native_value(self): @callback def _handle_coordinator_update(self) -> None: """Get a list of charge slots.""" - if self.coordinator.data is None or self.coordinator.data["mode"] == "DISCONNECTED" or self.coordinator.data["mode"] == "FINISHED_CHARGE": + if ( + self.coordinator.data is None + or self.coordinator.data["mode"] == "DISCONNECTED" + or self.coordinator.data["mode"] == "FINISHED_CHARGE" + ): self._state = None else: slots = slot_list(self.coordinator.data) - + # Store slots for external use self._hass.data[DOMAIN][self._client.email][DATA_SLOTS] = slots # Convert list to text self._state = slot_list_str(self._hass, self._client.email, slots) - + self._last_updated = utcnow() self.async_write_ha_state() class BatterySOCSensor(OhmeEntity, SensorEntity): """Sensor for car battery SOC.""" + _attr_translation_key = "battery_soc" _attr_native_unit_of_measurement = PERCENTAGE _attr_device_class = SensorDeviceClass.BATTERY @@ -243,8 +290,15 @@ def icon(self): @callback def _handle_coordinator_update(self) -> None: """Get value from data returned from API by coordinator""" - if self.coordinator.data and self.coordinator.data['car'] and self.coordinator.data['car']['batterySoc']: - self._state = self.coordinator.data['car']['batterySoc']['percent'] or self.coordinator.data['batterySoc']['percent'] + if ( + self.coordinator.data + and self.coordinator.data["car"] + and self.coordinator.data["car"]["batterySoc"] + ): + self._state = ( + self.coordinator.data["car"]["batterySoc"]["percent"] + or self.coordinator.data["batterySoc"]["percent"] + ) # Don't allow negatives if isinstance(self._state, int) and self._state < 0: diff --git a/custom_components/ohme/switch.py b/custom_components/ohme/switch.py index bb8057f..e2c9855 100644 --- a/custom_components/ohme/switch.py +++ b/custom_components/ohme/switch.py @@ -5,25 +5,27 @@ from homeassistant.core import callback, HomeAssistant from homeassistant.helpers.entity import generate_entity_id -from homeassistant.helpers.update_coordinator import ( - CoordinatorEntity -) +from homeassistant.helpers.update_coordinator import CoordinatorEntity from homeassistant.components.switch import SwitchEntity -from homeassistant.util.dt import (utcnow) - -from .const import DOMAIN, DATA_CLIENT, DATA_COORDINATORS, COORDINATOR_CHARGESESSIONS, COORDINATOR_ACCOUNTINFO +from homeassistant.util.dt import utcnow + +from .const import ( + DOMAIN, + DATA_CLIENT, + DATA_COORDINATORS, + COORDINATOR_CHARGESESSIONS, + COORDINATOR_ACCOUNTINFO, +) from .base import OhmeEntity _LOGGER = logging.getLogger(__name__) async def async_setup_entry( - hass: HomeAssistant, - config_entry: config_entries.ConfigEntry, - async_add_entities + hass: HomeAssistant, config_entry: config_entries.ConfigEntry, async_add_entities ): """Setup switches and configure coordinator.""" - account_id = config_entry.data['email'] + account_id = config_entry.data["email"] coordinators = hass.data[DOMAIN][account_id][DATA_COORDINATORS] @@ -31,33 +33,48 @@ async def async_setup_entry( accountinfo_coordinator = coordinators[COORDINATOR_ACCOUNTINFO] client = hass.data[DOMAIN][account_id][DATA_CLIENT] - switches = [OhmePauseChargeSwitch(coordinator, hass, client), - OhmeMaxChargeSwitch(coordinator, hass, client) - ] + switches = [ + OhmePauseChargeSwitch(coordinator, hass, client), + OhmeMaxChargeSwitch(coordinator, hass, client), + ] if client.cap_available(): - switches.append( - OhmePriceCapSwitch(accountinfo_coordinator, hass, client) - ) - + switches.append(OhmePriceCapSwitch(accountinfo_coordinator, hass, client)) + if client.solar_capable(): - switches.append( - OhmeSolarBoostSwitch(accountinfo_coordinator, hass, client) - ) + switches.append(OhmeSolarBoostSwitch(accountinfo_coordinator, hass, client)) if client.is_capable("buttonsLockable"): switches.append( OhmeConfigurationSwitch( - accountinfo_coordinator, hass, client, "lock_buttons", "lock", "buttonsLocked") + accountinfo_coordinator, + hass, + client, + "lock_buttons", + "lock", + "buttonsLocked", + ) ) if client.is_capable("pluginsRequireApprovalMode"): switches.append( - OhmeConfigurationSwitch(accountinfo_coordinator, hass, client, - "require_approval", "check-decagram", "pluginsRequireApproval") + OhmeConfigurationSwitch( + accountinfo_coordinator, + hass, + client, + "require_approval", + "check-decagram", + "pluginsRequireApproval", + ) ) if client.is_capable("stealth"): switches.append( - OhmeConfigurationSwitch(accountinfo_coordinator, hass, client, - "sleep_when_inactive", "power-sleep", "stealthEnabled") + OhmeConfigurationSwitch( + accountinfo_coordinator, + hass, + client, + "sleep_when_inactive", + "power-sleep", + "stealthEnabled", + ) ) async_add_entities(switches, update_before_add=True) @@ -65,14 +82,15 @@ async def async_setup_entry( class OhmePauseChargeSwitch(OhmeEntity, SwitchEntity): """Switch for pausing a charge.""" + _attr_translation_key = "pause_charge" _attr_icon = "mdi:pause" @callback def _handle_coordinator_update(self) -> None: """Determine if charge is paused. - We handle this differently to the sensors as the state of this switch - is evaluated only when new data is fetched to stop the switch flicking back then forth.""" + We handle this differently to the sensors as the state of this switch + is evaluated only when new data is fetched to stop the switch flicking back then forth.""" if self.coordinator.data is None: self._attr_is_on = False else: @@ -99,6 +117,7 @@ async def async_turn_off(self): class OhmeMaxChargeSwitch(OhmeEntity, SwitchEntity): """Switch for pausing a charge.""" + _attr_translation_key = "max_charge" _attr_icon = "mdi:battery-arrow-up" @@ -108,8 +127,7 @@ def _handle_coordinator_update(self) -> None: if self.coordinator.data is None: self._attr_is_on = False else: - self._attr_is_on = bool( - self.coordinator.data["mode"] == "MAX_CHARGE") + self._attr_is_on = bool(self.coordinator.data["mode"] == "MAX_CHARGE") self._last_updated = utcnow() @@ -126,7 +144,7 @@ async def async_turn_on(self): async def async_turn_off(self): """Stop max charging. - We are not changing anything, just applying the last rule. No need to supply anything.""" + We are not changing anything, just applying the last rule. No need to supply anything.""" await self._client.async_max_charge(False) await asyncio.sleep(1) @@ -136,7 +154,15 @@ async def async_turn_off(self): class OhmeConfigurationSwitch(OhmeEntity, SwitchEntity): """Switch for changing configuration options.""" - def __init__(self, coordinator, hass: HomeAssistant, client, translation_key, icon, config_key): + def __init__( + self, + coordinator, + hass: HomeAssistant, + client, + translation_key, + icon, + config_key, + ): self._attr_icon = f"mdi:{icon}" self._attr_translation_key = translation_key self._config_key = config_key @@ -208,6 +234,7 @@ async def async_turn_off(self): class OhmePriceCapSwitch(OhmeEntity, SwitchEntity): """Switch for enabling price cap.""" + _attr_translation_key = "enable_price_cap" _attr_icon = "mdi:car-speed-limiter" @@ -218,7 +245,8 @@ def _handle_coordinator_update(self) -> None: self._attr_is_on = None else: self._attr_is_on = bool( - self.coordinator.data["userSettings"]["chargeSettings"][0]["enabled"]) + self.coordinator.data["userSettings"]["chargeSettings"][0]["enabled"] + ) self._last_updated = utcnow() diff --git a/custom_components/ohme/time.py b/custom_components/ohme/time.py index 7d4f306..105aa9f 100644 --- a/custom_components/ohme/time.py +++ b/custom_components/ohme/time.py @@ -4,7 +4,13 @@ from homeassistant.components.time import TimeEntity from homeassistant.helpers.entity import generate_entity_id from homeassistant.core import callback, HomeAssistant -from .const import DOMAIN, DATA_CLIENT, DATA_COORDINATORS, COORDINATOR_CHARGESESSIONS, COORDINATOR_SCHEDULES +from .const import ( + DOMAIN, + DATA_CLIENT, + DATA_COORDINATORS, + COORDINATOR_CHARGESESSIONS, + COORDINATOR_SCHEDULES, +) from .utils import session_in_progress from datetime import time as dt_time from .base import OhmeEntity @@ -13,24 +19,29 @@ async def async_setup_entry( - hass: HomeAssistant, - config_entry: config_entries.ConfigEntry, - async_add_entities + hass: HomeAssistant, config_entry: config_entries.ConfigEntry, async_add_entities ): """Setup switches and configure coordinator.""" - account_id = config_entry.data['email'] + account_id = config_entry.data["email"] coordinators = hass.data[DOMAIN][account_id][DATA_COORDINATORS] client = hass.data[DOMAIN][account_id][DATA_CLIENT] - numbers = [TargetTime(coordinators[COORDINATOR_CHARGESESSIONS], - coordinators[COORDINATOR_SCHEDULES], hass, client)] + numbers = [ + TargetTime( + coordinators[COORDINATOR_CHARGESESSIONS], + coordinators[COORDINATOR_SCHEDULES], + hass, + client, + ) + ] async_add_entities(numbers, update_before_add=True) class TargetTime(OhmeEntity, TimeEntity): """Target time sensor.""" + _attr_translation_key = "target_time" _attr_id = "target_time" _attr_icon = "mdi:alarm-check" @@ -39,7 +50,7 @@ def __init__(self, coordinator, coordinator_schedules, hass: HomeAssistant, clie super().__init__(coordinator, hass, client) self.coordinator_schedules = coordinator_schedules - + async def async_added_to_hass(self) -> None: """When entity is added to hass.""" await super().async_added_to_hass() @@ -53,11 +64,15 @@ async def async_set_value(self, value: dt_time) -> None: """Update the current value.""" # If session in progress, update this session, if not update the first schedule if session_in_progress(self.hass, self._client.email, self.coordinator.data): - await self._client.async_apply_session_rule(target_time=(int(value.hour), int(value.minute))) + await self._client.async_apply_session_rule( + target_time=(int(value.hour), int(value.minute)) + ) await asyncio.sleep(1) await self.coordinator.async_refresh() else: - await self._client.async_update_schedule(target_time=(int(value.hour), int(value.minute))) + await self._client.async_update_schedule( + target_time=(int(value.hour), int(value.minute)) + ) await asyncio.sleep(1) await self.coordinator_schedules.async_refresh() @@ -67,15 +82,13 @@ def _handle_coordinator_update(self) -> None: # Read with the same logic as setting target = None if session_in_progress(self.hass, self._client.email, self.coordinator.data): - target = self.coordinator.data['appliedRule']['targetTime'] + target = self.coordinator.data["appliedRule"]["targetTime"] elif self.coordinator_schedules.data: - target = self.coordinator_schedules.data['targetTime'] - + target = self.coordinator_schedules.data["targetTime"] + if target: self._state = dt_time( - hour=target // 3600, - minute=(target % 3600) // 60, - second=0 + hour=target // 3600, minute=(target % 3600) // 60, second=0 ) self.async_write_ha_state() diff --git a/custom_components/ohme/utils.py b/custom_components/ohme/utils.py index 1fb5a64..86fa706 100644 --- a/custom_components/ohme/utils.py +++ b/custom_components/ohme/utils.py @@ -17,74 +17,85 @@ def next_slot(hass, account_id, data): # Loop through slots for slot in slots: # Only take the first slot start/end that matches. These are in order. - if end is None and slot['end'] > datetime.now().astimezone(): - end = slot['end'] + if end is None and slot["end"] > datetime.now().astimezone(): + end = slot["end"] - if start is None and slot['start'] > datetime.now().astimezone() and slot['start'] != end: - start = slot['start'] - - if collapse_slots and slot['start'] == end: - end = slot['end'] + if ( + start is None + and slot["start"] > datetime.now().astimezone() + and slot["start"] != end + ): + start = slot["start"] - return { - "start": start, - "end": end - } + if collapse_slots and slot["start"] == end: + end = slot["end"] + + return {"start": start, "end": end} def slot_list(data): """Get list of charge slots.""" - session_slots = data['allSessionSlots'] + session_slots = data["allSessionSlots"] if session_slots is None or len(session_slots) == 0: return [] - + slots = [] wh_tally = 0 - - if 'batterySocBefore' in data and data['batterySocBefore'] is not None and data['batterySocBefore']['wh'] is not None: - wh_tally = data['batterySocBefore']['wh'] # Get the wh value we start from + + if ( + "batterySocBefore" in data + and data["batterySocBefore"] is not None + and data["batterySocBefore"]["wh"] is not None + ): + wh_tally = data["batterySocBefore"]["wh"] # Get the wh value we start from for slot in session_slots: slots.append( { - "start": datetime.utcfromtimestamp(slot['startTimeMs'] / 1000).replace(tzinfo=pytz.utc, microsecond=0).astimezone(), - "end": datetime.utcfromtimestamp(slot['endTimeMs'] / 1000).replace(tzinfo=pytz.utc, microsecond=0).astimezone(), - "charge_in_kwh": -((slot['estimatedSoc']['wh'] - wh_tally) / 1000), # Work out how much we add in just this slot + "start": datetime.utcfromtimestamp(slot["startTimeMs"] / 1000) + .replace(tzinfo=pytz.utc, microsecond=0) + .astimezone(), + "end": datetime.utcfromtimestamp(slot["endTimeMs"] / 1000) + .replace(tzinfo=pytz.utc, microsecond=0) + .astimezone(), + "charge_in_kwh": -( + (slot["estimatedSoc"]["wh"] - wh_tally) / 1000 + ), # Work out how much we add in just this slot "source": "smart-charge", - "location": None + "location": None, } ) - - wh_tally = slot['estimatedSoc']['wh'] + + wh_tally = slot["estimatedSoc"]["wh"] return slots def slot_list_str(hass, account_id, slots): - """Convert slot list to string.""" + """Convert slot list to string.""" + + # Convert list to tuples of times + t_slots = [] + for slot in slots: + t_slots.append((slot["start"].strftime("%H:%M"), slot["end"].strftime("%H:%M"))) - # Convert list to tuples of times - t_slots = [] - for slot in slots: - t_slots.append((slot['start'].strftime('%H:%M'), slot['end'].strftime('%H:%M'))) + state = [] - state = [] + if not get_option(hass, account_id, "never_collapse_slots", False): + # Collapse slots so consecutive slots become one + for i in range(len(t_slots)): + if not state or state[-1][1] != t_slots[i][0]: + state.append(t_slots[i]) + else: + state[-1] = (state[-1][0], t_slots[i][1]) + else: + state = t_slots - if not get_option(hass, account_id, "never_collapse_slots", False): - # Collapse slots so consecutive slots become one - for i in range(len(t_slots)): - if not state or state[-1][1] != t_slots[i][0]: - state.append(t_slots[i]) - else: - state[-1] = (state[-1][0], t_slots[i][1]) - else: - state = t_slots - - # Convert list of tuples to string - state = reduce(lambda acc, slot: acc + f"{slot[0]}-{slot[1]}, ", state, "")[:-2] + # Convert list of tuples to string + state = reduce(lambda acc, slot: acc + f"{slot[0]}-{slot[1]}, ", state, "")[:-2] - # Make sure we return None/Unknown if the list is empty - return None if state == "" else state + # Make sure we return None/Unknown if the list is empty + return None if state == "" else state def in_slot(data): @@ -94,9 +105,12 @@ def in_slot(data): # Loop through slots for slot in slots: # If we are in one - if slot['start'] < datetime.now().astimezone() and slot['end'] > datetime.now().astimezone(): + if ( + slot["start"] < datetime.now().astimezone() + and slot["end"] > datetime.now().astimezone() + ): return True - + return False @@ -112,19 +126,19 @@ def time_next_occurs(hour, minute): def session_in_progress(hass, account_id, data): """Is there a session in progress? - Used to check if we should update the current session rather than the first schedule.""" + Used to check if we should update the current session rather than the first schedule.""" # If config option set, never update session specific schedule if get_option(hass, account_id, "never_session_specific"): return False - + # Default to False with no data if not data: return False - + # Car disconnected or pending approval, we should update the schedule - if data['mode'] == "DISCONNECTED" or data['mode'] == "PENDING_APPROVAL": + if data["mode"] == "DISCONNECTED" or data["mode"] == "PENDING_APPROVAL": return False - + return True diff --git a/tests/__init__.py b/tests/__init__.py index 60defa5..75c4fbd 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1 +1 @@ -"""Tests for the Ohme integration.""" \ No newline at end of file +"""Tests for the Ohme integration.""" diff --git a/tests/conftest.py b/tests/conftest.py index 5978791..5e7d8c2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,11 +1,14 @@ """Global fixtures for custom integration.""" + import pytest import pytest_socket + @pytest.fixture(autouse=True) def auto_enable_custom_integrations(enable_custom_integrations): """Enable custom integrations defined in the test dir.""" yield + def enable_external_sockets(): pytest_socket.enable_socket() diff --git a/tests/test_binary_sensor.py b/tests/test_binary_sensor.py index 4468a28..c440c10 100644 --- a/tests/test_binary_sensor.py +++ b/tests/test_binary_sensor.py @@ -1,9 +1,16 @@ import pytest from unittest.mock import AsyncMock, MagicMock from homeassistant.core import HomeAssistant -from homeassistant.util.dt import (utcnow) +from homeassistant.util.dt import utcnow from homeassistant.helpers.update_coordinator import DataUpdateCoordinator -from custom_components.ohme.const import DOMAIN, DATA_CLIENT, DATA_SLOTS, DATA_COORDINATORS, COORDINATOR_CHARGESESSIONS, COORDINATOR_ADVANCED +from custom_components.ohme.const import ( + DOMAIN, + DATA_CLIENT, + DATA_SLOTS, + DATA_COORDINATORS, + COORDINATOR_CHARGESESSIONS, + COORDINATOR_ADVANCED, +) from custom_components.ohme.binary_sensor import ( ConnectedBinarySensor, @@ -13,6 +20,7 @@ ChargerOnlineBinarySensor, ) + @pytest.fixture def mock_hass(): hass = MagicMock(spec=HomeAssistant) @@ -23,22 +31,25 @@ def mock_hass(): DATA_COORDINATORS: { COORDINATOR_CHARGESESSIONS: MagicMock(spec=DataUpdateCoordinator), COORDINATOR_ADVANCED: MagicMock(spec=DataUpdateCoordinator), - } + }, } } } return hass + @pytest.fixture def mock_coordinator(): return MagicMock(spec=DataUpdateCoordinator) + @pytest.fixture def mock_client(): mock = MagicMock() mock.email = "test_account" return mock + def test_connected_binary_sensor(mock_hass, mock_coordinator, mock_client): sensor = ConnectedBinarySensor(mock_coordinator, mock_hass, mock_client) mock_coordinator.data = {"mode": "CONNECTED"} @@ -47,12 +58,19 @@ def test_connected_binary_sensor(mock_hass, mock_coordinator, mock_client): mock_coordinator.data = {"mode": "DISCONNECTED"} assert sensor.is_on is False + def test_charging_binary_sensor(mock_hass, mock_coordinator, mock_client): sensor = ChargingBinarySensor(mock_coordinator, mock_hass, mock_client) - mock_coordinator.data = {"power": {"watt": 100}, "batterySoc": {"wh": 50}, "mode": "CONNECTED", "allSessionSlots": []} + mock_coordinator.data = { + "power": {"watt": 100}, + "batterySoc": {"wh": 50}, + "mode": "CONNECTED", + "allSessionSlots": [], + } sensor._last_reading = {"power": {"watt": 100}, "batterySoc": {"wh": 40}} assert sensor._calculate_state() is True + def test_pending_approval_binary_sensor(mock_hass, mock_coordinator, mock_client): sensor = PendingApprovalBinarySensor(mock_coordinator, mock_hass, mock_client) mock_coordinator.data = {"mode": "PENDING_APPROVAL"} @@ -61,6 +79,7 @@ def test_pending_approval_binary_sensor(mock_hass, mock_coordinator, mock_client mock_coordinator.data = {"mode": "CONNECTED"} assert sensor.is_on is False + def test_charger_online_binary_sensor(mock_hass, mock_coordinator, mock_client): sensor = ChargerOnlineBinarySensor(mock_coordinator, mock_hass, mock_client) mock_coordinator.data = {"online": True} diff --git a/tests/test_button.py b/tests/test_button.py index 64dd7bb..aed7f59 100644 --- a/tests/test_button.py +++ b/tests/test_button.py @@ -4,23 +4,25 @@ from homeassistant.helpers import entity_registry as er from homeassistant.setup import async_setup_component from custom_components.ohme.button import async_setup_entry, OhmeApproveChargeButton -from custom_components.ohme.const import DOMAIN, DATA_CLIENT, DATA_COORDINATORS, COORDINATOR_CHARGESESSIONS +from custom_components.ohme.const import ( + DOMAIN, + DATA_CLIENT, + DATA_COORDINATORS, + COORDINATOR_CHARGESESSIONS, +) + @pytest.fixture def mock_hass(): hass = MagicMock() - hass.data = { - DOMAIN: { - 'test_account': { - - } - } - } + hass.data = {DOMAIN: {"test_account": {}}} return hass + @pytest.fixture def mock_config_entry(): - return AsyncMock(data={'email': 'test@example.com'}) + return AsyncMock(data={"email": "test@example.com"}) + @pytest.fixture def mock_client(): @@ -29,32 +31,34 @@ def mock_client(): client.async_approve_charge = AsyncMock() return client + @pytest.fixture def mock_coordinator(): coordinator = AsyncMock() coordinator.async_refresh = AsyncMock() return coordinator + @pytest.fixture def setup_hass(mock_hass, mock_config_entry, mock_client, mock_coordinator): mock_hass.data = { DOMAIN: { - 'test@example.com': { + "test@example.com": { DATA_CLIENT: mock_client, - DATA_COORDINATORS: { - COORDINATOR_CHARGESESSIONS: mock_coordinator - } + DATA_COORDINATORS: {COORDINATOR_CHARGESESSIONS: mock_coordinator}, } } } return mock_hass + @pytest.mark.asyncio async def test_async_setup_entry(setup_hass, mock_config_entry): async_add_entities = AsyncMock() await async_setup_entry(setup_hass, mock_config_entry, async_add_entities) assert async_add_entities.call_count == 1 + @pytest.mark.asyncio async def test_ohme_approve_charge_button(setup_hass, mock_client, mock_coordinator): button = OhmeApproveChargeButton(mock_coordinator, setup_hass, mock_client) diff --git a/tests/test_config_flow.py b/tests/test_config_flow.py index 8a0b095..cbc8711 100644 --- a/tests/test_config_flow.py +++ b/tests/test_config_flow.py @@ -1,4 +1,5 @@ """Tests for the config flow.""" + from unittest import mock from homeassistant.const import CONF_ACCESS_TOKEN, CONF_NAME, CONF_PATH import pytest @@ -7,22 +8,23 @@ from custom_components.ohme import config_flow from custom_components.ohme.const import DOMAIN + async def test_step_account(hass): """Test the initialization of the form in the first step of the config flow.""" result = await hass.config_entries.flow.async_init( config_flow.DOMAIN, context={"source": "user"} ) - - expected = { - 'type': 'form', - 'flow_id': mock.ANY, - 'handler': 'ohme', - 'step_id': 'user', - 'data_schema': config_flow.USER_SCHEMA, - 'errors': {}, - 'description_placeholders': None, - 'last_step': None, - 'preview': None + + expected = { + "type": "form", + "flow_id": mock.ANY, + "handler": "ohme", + "step_id": "user", + "data_schema": config_flow.USER_SCHEMA, + "errors": {}, + "description_placeholders": None, + "last_step": None, + "preview": None, } assert expected == result diff --git a/tests/test_number.py b/tests/test_number.py index 68ef0fc..f8bb713 100644 --- a/tests/test_number.py +++ b/tests/test_number.py @@ -1,7 +1,14 @@ import pytest from unittest.mock import AsyncMock, MagicMock, patch from homeassistant.core import HomeAssistant -from custom_components.ohme.const import DOMAIN, DATA_CLIENT, DATA_COORDINATORS, COORDINATOR_ACCOUNTINFO, COORDINATOR_CHARGESESSIONS, COORDINATOR_SCHEDULES +from custom_components.ohme.const import ( + DOMAIN, + DATA_CLIENT, + DATA_COORDINATORS, + COORDINATOR_ACCOUNTINFO, + COORDINATOR_CHARGESESSIONS, + COORDINATOR_SCHEDULES, +) from custom_components.ohme.number import ( async_setup_entry, @@ -10,67 +17,88 @@ PriceCapNumber, ) + @pytest.fixture def mock_hass(): - hass = MagicMock(data = { - DOMAIN: { - "test@example.com": { - DATA_COORDINATORS: [ - AsyncMock(), - AsyncMock(), - AsyncMock(), - AsyncMock() - ], - DATA_CLIENT: AsyncMock() + hass = MagicMock( + data={ + DOMAIN: { + "test@example.com": { + DATA_COORDINATORS: [ + AsyncMock(), + AsyncMock(), + AsyncMock(), + AsyncMock(), + ], + DATA_CLIENT: AsyncMock(), + } } } - }) + ) return hass + @pytest.fixture def mock_config_entry(): return AsyncMock(data={"email": "test@example.com"}) + @pytest.fixture def mock_async_add_entities(): return AsyncMock() + @pytest.mark.asyncio async def test_async_setup_entry(mock_hass, mock_config_entry, mock_async_add_entities): await async_setup_entry(mock_hass, mock_config_entry, mock_async_add_entities) assert mock_async_add_entities.call_count == 1 + @pytest.mark.asyncio async def test_target_percent_number(mock_hass): - coordinator = mock_hass.data[DOMAIN]["test@example.com"][DATA_COORDINATORS][COORDINATOR_CHARGESESSIONS] - coordinator_schedules = mock_hass.data[DOMAIN]["test@example.com"][DATA_COORDINATORS][COORDINATOR_SCHEDULES] + coordinator = mock_hass.data[DOMAIN]["test@example.com"][DATA_COORDINATORS][ + COORDINATOR_CHARGESESSIONS + ] + coordinator_schedules = mock_hass.data[DOMAIN]["test@example.com"][ + DATA_COORDINATORS + ][COORDINATOR_SCHEDULES] client = mock_hass.data[DOMAIN]["test@example.com"][DATA_CLIENT] number = TargetPercentNumber(coordinator, coordinator_schedules, mock_hass, client) - with patch('custom_components.ohme.number.session_in_progress', return_value=True): + with patch("custom_components.ohme.number.session_in_progress", return_value=True): await number.async_added_to_hass() await number.async_set_native_value(50) assert number._state is None or number._state == 50 + @pytest.mark.asyncio async def test_preconditioning_number(mock_hass): - coordinator = mock_hass.data[DOMAIN]["test@example.com"][DATA_COORDINATORS][COORDINATOR_CHARGESESSIONS] - coordinator_schedules = mock_hass.data[DOMAIN]["test@example.com"][DATA_COORDINATORS][COORDINATOR_SCHEDULES] + coordinator = mock_hass.data[DOMAIN]["test@example.com"][DATA_COORDINATORS][ + COORDINATOR_CHARGESESSIONS + ] + coordinator_schedules = mock_hass.data[DOMAIN]["test@example.com"][ + DATA_COORDINATORS + ][COORDINATOR_SCHEDULES] client = mock_hass.data[DOMAIN]["test@example.com"][DATA_CLIENT] - number = PreconditioningNumber(coordinator, coordinator_schedules, mock_hass, client) + number = PreconditioningNumber( + coordinator, coordinator_schedules, mock_hass, client + ) - with patch('custom_components.ohme.number.session_in_progress', return_value=True): + with patch("custom_components.ohme.number.session_in_progress", return_value=True): await number.async_added_to_hass() await number.async_set_native_value(30) assert number._state is None or number._state == 30 + @pytest.mark.asyncio async def test_price_cap_number(mock_hass): - coordinator = mock_hass.data[DOMAIN]["test@example.com"][DATA_COORDINATORS][COORDINATOR_ACCOUNTINFO] + coordinator = mock_hass.data[DOMAIN]["test@example.com"][DATA_COORDINATORS][ + COORDINATOR_ACCOUNTINFO + ] client = mock_hass.data[DOMAIN]["test@example.com"][DATA_CLIENT] number = PriceCapNumber(coordinator, mock_hass, client) diff --git a/tests/test_sensor.py b/tests/test_sensor.py index 1727a13..6271d63 100644 --- a/tests/test_sensor.py +++ b/tests/test_sensor.py @@ -2,12 +2,14 @@ from unittest.mock import MagicMock from custom_components.ohme.sensor import VoltageSensor + @pytest.fixture def mock_coordinator(): """Fixture for creating a mock coordinator.""" coordinator = MagicMock() return coordinator + @pytest.fixture def voltage_sensor(mock_coordinator): """Fixture for creating a VoltageSensor instance.""" @@ -15,17 +17,20 @@ def voltage_sensor(mock_coordinator): client = MagicMock() return VoltageSensor(mock_coordinator, hass, client) + def test_voltage_sensor_native_value_with_data(voltage_sensor, mock_coordinator): """Test native_value when coordinator has data.""" - mock_coordinator.data = {'power': {'volt': 230}} + mock_coordinator.data = {"power": {"volt": 230}} assert voltage_sensor.native_value == 230 + def test_voltage_sensor_native_value_no_data(voltage_sensor, mock_coordinator): """Test native_value when coordinator has no data.""" mock_coordinator.data = None assert voltage_sensor.native_value is None + def test_voltage_sensor_native_value_no_power_data(voltage_sensor, mock_coordinator): """Test native_value when coordinator has no power data.""" - mock_coordinator.data = {'power': None} + mock_coordinator.data = {"power": None} assert voltage_sensor.native_value is None diff --git a/tests/test_switch.py b/tests/test_switch.py index 800a30e..062b1ca 100644 --- a/tests/test_switch.py +++ b/tests/test_switch.py @@ -2,7 +2,13 @@ from unittest.mock import AsyncMock, patch from homeassistant.core import HomeAssistant from homeassistant.helpers.update_coordinator import DataUpdateCoordinator -from custom_components.ohme.const import DOMAIN, DATA_CLIENT, DATA_COORDINATORS, COORDINATOR_CHARGESESSIONS, COORDINATOR_ACCOUNTINFO +from custom_components.ohme.const import ( + DOMAIN, + DATA_CLIENT, + DATA_COORDINATORS, + COORDINATOR_CHARGESESSIONS, + COORDINATOR_ACCOUNTINFO, +) from custom_components.ohme.switch import ( async_setup_entry, @@ -13,40 +19,50 @@ OhmePriceCapSwitch, ) + @pytest.fixture def mock_hass(): return AsyncMock(spec=HomeAssistant) + @pytest.fixture def mock_client(): client = AsyncMock() client.cap_available.return_value = True client.solar_capable.return_value = True - client.is_capable.side_effect = lambda x: x in ["buttonsLockable", "pluginsRequireApprovalMode", "stealth"] + client.is_capable.side_effect = lambda x: x in [ + "buttonsLockable", + "pluginsRequireApprovalMode", + "stealth", + ] return client + @pytest.fixture def mock_coordinator(): return AsyncMock(spec=DataUpdateCoordinator) + @pytest.fixture def mock_config_entry(): - return AsyncMock(data={'email': 'test@example.com'}) + return AsyncMock(data={"email": "test@example.com"}) + @pytest.fixture def setup_hass_data(mock_hass, mock_client, mock_coordinator): mock_hass.data = { DOMAIN: { - 'test@example.com': { + "test@example.com": { DATA_CLIENT: mock_client, DATA_COORDINATORS: { COORDINATOR_CHARGESESSIONS: mock_coordinator, COORDINATOR_ACCOUNTINFO: mock_coordinator, - } + }, } } } + @pytest.mark.asyncio async def test_async_setup_entry(mock_hass, mock_config_entry, setup_hass_data): async_add_entities = AsyncMock() @@ -54,6 +70,7 @@ async def test_async_setup_entry(mock_hass, mock_config_entry, setup_hass_data): assert async_add_entities.call_count == 1 assert len(async_add_entities.call_args[0][0]) == 7 + @pytest.mark.asyncio async def test_ohme_pause_charge_switch(mock_hass, mock_client, mock_coordinator): switch = OhmePauseChargeSwitch(mock_coordinator, mock_hass, mock_client) @@ -62,6 +79,7 @@ async def test_ohme_pause_charge_switch(mock_hass, mock_client, mock_coordinator await switch.async_turn_off() mock_client.async_resume_charge.assert_called_once() + @pytest.mark.asyncio async def test_ohme_max_charge_switch(mock_hass, mock_client, mock_coordinator): switch = OhmeMaxChargeSwitch(mock_coordinator, mock_hass, mock_client) @@ -71,23 +89,41 @@ async def test_ohme_max_charge_switch(mock_hass, mock_client, mock_coordinator): await switch.async_turn_off() mock_client.async_max_charge.assert_called_once_with(False) + @pytest.mark.asyncio async def test_ohme_configuration_switch(mock_hass, mock_client, mock_coordinator): - switch = OhmeConfigurationSwitch(mock_coordinator, mock_hass, mock_client, "lock_buttons", "lock", "buttonsLocked") + switch = OhmeConfigurationSwitch( + mock_coordinator, + mock_hass, + mock_client, + "lock_buttons", + "lock", + "buttonsLocked", + ) await switch.async_turn_on() - mock_client.async_set_configuration_value.assert_called_once_with({"buttonsLocked": True}) + mock_client.async_set_configuration_value.assert_called_once_with( + {"buttonsLocked": True} + ) mock_client.async_set_configuration_value.reset_mock() await switch.async_turn_off() - mock_client.async_set_configuration_value.assert_called_once_with({"buttonsLocked": False}) + mock_client.async_set_configuration_value.assert_called_once_with( + {"buttonsLocked": False} + ) + @pytest.mark.asyncio async def test_ohme_solar_boost_switch(mock_hass, mock_client, mock_coordinator): switch = OhmeSolarBoostSwitch(mock_coordinator, mock_hass, mock_client) await switch.async_turn_on() - mock_client.async_set_configuration_value.assert_called_once_with({"solarMode": "ZERO_EXPORT"}) + mock_client.async_set_configuration_value.assert_called_once_with( + {"solarMode": "ZERO_EXPORT"} + ) mock_client.async_set_configuration_value.reset_mock() await switch.async_turn_off() - mock_client.async_set_configuration_value.assert_called_once_with({"solarMode": "IGNORE"}) + mock_client.async_set_configuration_value.assert_called_once_with( + {"solarMode": "IGNORE"} + ) + @pytest.mark.asyncio async def test_ohme_price_cap_switch(mock_hass, mock_client, mock_coordinator): diff --git a/tests/test_time.py b/tests/test_time.py index 906ad8b..713116d 100644 --- a/tests/test_time.py +++ b/tests/test_time.py @@ -4,63 +4,87 @@ from homeassistant.core import HomeAssistant from homeassistant.helpers.entity import Entity from custom_components.ohme.time import async_setup_entry, TargetTime -from custom_components.ohme.const import DOMAIN, DATA_CLIENT, DATA_COORDINATORS, COORDINATOR_CHARGESESSIONS, COORDINATOR_SCHEDULES +from custom_components.ohme.const import ( + DOMAIN, + DATA_CLIENT, + DATA_COORDINATORS, + COORDINATOR_CHARGESESSIONS, + COORDINATOR_SCHEDULES, +) + @pytest.fixture def mock_hass(): hass = MagicMock() hass.data = { DOMAIN: { - 'test@example.com': { + "test@example.com": { DATA_COORDINATORS: [ MagicMock(async_refresh=AsyncMock()), MagicMock(async_refresh=AsyncMock()), MagicMock(async_refresh=AsyncMock()), - MagicMock(async_refresh=AsyncMock()) + MagicMock(async_refresh=AsyncMock()), ], - DATA_CLIENT: MagicMock(async_apply_session_rule=AsyncMock(), async_update_schedule=AsyncMock()) - + DATA_CLIENT: MagicMock( + async_apply_session_rule=AsyncMock(), + async_update_schedule=AsyncMock(), + ), } } } return hass + @pytest.fixture def mock_config_entry(): - return AsyncMock(data={'email': 'test@example.com'}) + return AsyncMock(data={"email": "test@example.com"}) + @pytest.fixture def mock_async_add_entities(): return AsyncMock() + @pytest.mark.asyncio async def test_async_setup_entry(mock_hass, mock_config_entry, mock_async_add_entities): await async_setup_entry(mock_hass, mock_config_entry, mock_async_add_entities) assert mock_async_add_entities.called + @pytest.fixture def target_time_entity(mock_hass): - coordinator = mock_hass.data[DOMAIN]['test@example.com'][DATA_COORDINATORS][COORDINATOR_CHARGESESSIONS] - coordinator_schedules = mock_hass.data[DOMAIN]['test@example.com'][DATA_COORDINATORS][COORDINATOR_SCHEDULES] - client = mock_hass.data[DOMAIN]['test@example.com'][DATA_CLIENT] + coordinator = mock_hass.data[DOMAIN]["test@example.com"][DATA_COORDINATORS][ + COORDINATOR_CHARGESESSIONS + ] + coordinator_schedules = mock_hass.data[DOMAIN]["test@example.com"][ + DATA_COORDINATORS + ][COORDINATOR_SCHEDULES] + client = mock_hass.data[DOMAIN]["test@example.com"][DATA_CLIENT] return TargetTime(coordinator, coordinator_schedules, mock_hass, client) + @pytest.mark.asyncio async def test_async_added_to_hass(target_time_entity): - with patch.object(target_time_entity.coordinator_schedules, 'async_add_listener', return_value=AsyncMock()) as mock_add_listener: + with patch.object( + target_time_entity.coordinator_schedules, + "async_add_listener", + return_value=AsyncMock(), + ) as mock_add_listener: await target_time_entity.async_added_to_hass() assert mock_add_listener.called + @pytest.mark.asyncio async def test_async_set_value(target_time_entity): - with patch('custom_components.ohme.time.session_in_progress', return_value=True): + with patch("custom_components.ohme.time.session_in_progress", return_value=True): await target_time_entity.async_set_value(dt_time(12, 30)) assert target_time_entity._client.async_apply_session_rule.called - with patch('custom_components.ohme.time.session_in_progress', return_value=False): + with patch("custom_components.ohme.time.session_in_progress", return_value=False): await target_time_entity.async_set_value(dt_time(12, 30)) assert target_time_entity._client.async_update_schedule.called + def test_native_value(target_time_entity): target_time_entity._state = dt_time(12, 30) assert target_time_entity.native_value == dt_time(12, 30) diff --git a/tests/test_utils.py b/tests/test_utils.py index ff9ec9b..3bc9409 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,4 +1,5 @@ """Tests for the utils.""" + from unittest import mock import random from time import time