diff --git a/custom_components/econnect_metronet/devices.py b/custom_components/econnect_metronet/devices.py index 4e8cb60..cab83b1 100644 --- a/custom_components/econnect_metronet/devices.py +++ b/custom_components/econnect_metronet/devices.py @@ -26,6 +26,7 @@ CONF_AREAS_ARM_VACATION, NOTIFICATION_MESSAGE, ) +from .helpers import split_code _LOGGER = logging.getLogger(__name__) @@ -291,7 +292,13 @@ def update(self): def arm(self, code, sectors=None): try: - with self._connection.lock(code): + # Detect if the user is trying to arm a system that requires a user ID + if not self.panel.get("login_without_user_id", True): + user_id, code = split_code(code) + else: + user_id = None + + with self._connection.lock(code, user_id=user_id): self._connection.arm(sectors=sectors) self.state = STATE_ALARM_ARMED_AWAY except HTTPError as err: @@ -309,7 +316,13 @@ def arm(self, code, sectors=None): def disarm(self, code, sectors=None): try: - with self._connection.lock(code): + # Detect if the user is trying to arm a system that requires a user ID + if not self.panel.get("login_without_user_id", True): + user_id, code = split_code(code) + else: + user_id = None + + with self._connection.lock(code, user_id=user_id): self._connection.disarm(sectors=sectors) self.state = STATE_ALARM_DISARMED except HTTPError as err: diff --git a/custom_components/econnect_metronet/helpers.py b/custom_components/econnect_metronet/helpers.py index fc4ef2e..0b46c4f 100644 --- a/custom_components/econnect_metronet/helpers.py +++ b/custom_components/econnect_metronet/helpers.py @@ -1,6 +1,7 @@ from typing import List, Tuple, Union import voluptuous as vol +from elmo.api.exceptions import CodeError from homeassistant.config_entries import ConfigEntry from homeassistant.const import CONF_USERNAME from homeassistant.helpers.config_validation import multi_select @@ -81,3 +82,30 @@ def generate_entity_id(config: ConfigEntry, name: Union[str, None] = None) -> st # See: https://www.home-assistant.io/faq/unique_id/#can-be-changed entity_name = slugify(f"{system_name}_{additional_name}") return f"{DOMAIN}.{DOMAIN}_{entity_name}" + + +def split_code(code: str) -> Tuple[str, str]: + """Splits the given code into two parts: user ID and code. + + The function returns a tuple containing the user ID and the code as separate strings. + The code is expected to be in the format without spaces, with the CODE + part being the last 6 characters of the string. + + Args: + code: A string representing the combined user ID and code. + + Returns: + A tuple of two strings: (user ID, code). + + Raises: + CodeError: If the input code is less than 7 characters long, indicating it does not + conform to the expected format. + """ + if len(code) <= 6: + raise CodeError("Your code must be in the format without spaces.") + + user_id_part, code_part = code[:-6], code[-6:] + if not (user_id_part.isdigit() and code_part.isdigit()): + raise CodeError("Both user ID and code must be numbers.") + + return user_id_part, code_part diff --git a/custom_components/econnect_metronet/manifest.json b/custom_components/econnect_metronet/manifest.json index b72c80b..a5124c0 100644 --- a/custom_components/econnect_metronet/manifest.json +++ b/custom_components/econnect_metronet/manifest.json @@ -16,7 +16,7 @@ "elmo" ], "requirements": [ - "econnect-python==0.11.0b0" + "econnect-python==0.11.0" ], "version": "2.2.1" } diff --git a/pyproject.toml b/pyproject.toml index 5adbe4e..c9b1a10 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ classifiers = [ "Programming Language :: Python :: Implementation :: CPython", ] dependencies = [ - "econnect-python==0.11.0b0", + "econnect-python==0.11.0", "async_timeout", "homeassistant", ] diff --git a/tests/test_devices.py b/tests/test_devices.py index d9da8eb..4db073f 100644 --- a/tests/test_devices.py +++ b/tests/test_devices.py @@ -1133,18 +1133,71 @@ def test_device_update_query_not_valid(client, mocker): assert device.update() is None -def test_device_arm_success(client, mocker): +def test_device_arm_success(alarm_device, mocker): """Should arm the e-connect system using the underlying client.""" - device = AlarmDevice(client) - mocker.spy(device._connection, "lock") - mocker.spy(device._connection, "arm") + mocker.spy(alarm_device._connection, "lock") + mocker.spy(alarm_device._connection, "arm") # Test - device._connection._session_id = "test" - device.arm("1234", sectors=[4]) - assert device._connection.lock.call_count == 1 - assert device._connection.arm.call_count == 1 - assert "1234" in device._connection.lock.call_args[0] - assert {"sectors": [4]} == device._connection.arm.call_args[1] + alarm_device.arm("1234", sectors=[4]) + assert alarm_device._connection.lock.call_count == 1 + assert alarm_device._connection.arm.call_count == 1 + assert "1234" in alarm_device._connection.lock.call_args[0] + assert {"sectors": [4]} == alarm_device._connection.arm.call_args[1] + + +def test_device_arm_success_without_panel_details(alarm_device, mocker): + """Should assume `userId` is not required if panel details are empty.""" + alarm_device._inventory = {} + mocker.spy(alarm_device._connection, "lock") + mocker.spy(alarm_device._connection, "arm") + # Test + alarm_device._connection._session_id = "test" + alarm_device.arm("1234", sectors=[4]) + assert alarm_device._connection.lock.call_count == 1 + assert alarm_device._connection.arm.call_count == 1 + assert "1234" in alarm_device._connection.lock.call_args[0] + assert {"sectors": [4]} == alarm_device._connection.arm.call_args[1] + + +def test_device_arm_success_with_user_id(alarm_device, mocker): + """Should split the code if the login with `userId` is required.""" + alarm_device._inventory[0]["login_without_user_id"] = False + mocker.spy(alarm_device._connection, "lock") + mocker.spy(alarm_device._connection, "arm") + # Test + alarm_device._connection._session_id = "test" + alarm_device.arm("001123456", sectors=[4]) + assert alarm_device._connection.lock.call_count == 1 + assert alarm_device._connection.arm.call_count == 1 + assert "123456" in alarm_device._connection.lock.call_args[0] + assert {"user_id": "001"} == alarm_device._connection.lock.call_args[1] + assert {"sectors": [4]} == alarm_device._connection.arm.call_args[1] + + +def test_device_arm_success_user_id_not_required(alarm_device, mocker): + """Should not split the code if the login with `userId` is not required.""" + alarm_device._inventory[0]["login_without_user_id"] = True + mocker.spy(alarm_device._connection, "lock") + mocker.spy(alarm_device._connection, "arm") + # Test + alarm_device._connection._session_id = "test" + alarm_device.arm("123456", sectors=[4]) + assert alarm_device._connection.lock.call_count == 1 + assert alarm_device._connection.arm.call_count == 1 + assert "123456" in alarm_device._connection.lock.call_args[0] + assert {"user_id": None} == alarm_device._connection.lock.call_args[1] + assert {"sectors": [4]} == alarm_device._connection.arm.call_args[1] + + +def test_device_arm_code_error_with_user_id(alarm_device, mocker): + """Should raise an error if the code can't be split in `userId` and `code`.""" + alarm_device._inventory[0]["login_without_user_id"] = False + mocker.spy(alarm_device._connection, "lock") + mocker.spy(alarm_device._connection, "arm") + # Test + alarm_device._connection._session_id = "test" + with pytest.raises(CodeError): + alarm_device.arm("1234", sectors=[4]) def test_device_arm_error(client, mocker): @@ -1189,19 +1242,73 @@ def test_device_arm_code_error(client, mocker): assert device._connection.arm.call_count == 0 -def test_device_disarm_success(client, mocker): +def test_device_disarm_success(alarm_device, mocker): """Should disarm the e-connect system using the underlying client.""" - device = AlarmDevice(client) - mocker.spy(device._connection, "lock") - mocker.spy(device._connection, "disarm") + mocker.spy(alarm_device._connection, "lock") + mocker.spy(alarm_device._connection, "disarm") # Test - device._connection._session_id = "test" - device.disarm("1234", sectors=[4]) + alarm_device._connection._session_id = "test" + alarm_device.disarm("1234", sectors=[4]) - assert device._connection.lock.call_count == 1 - assert device._connection.disarm.call_count == 1 - assert "1234" in device._connection.lock.call_args[0] - assert {"sectors": [4]} == device._connection.disarm.call_args[1] + assert alarm_device._connection.lock.call_count == 1 + assert alarm_device._connection.disarm.call_count == 1 + assert "1234" in alarm_device._connection.lock.call_args[0] + assert {"sectors": [4]} == alarm_device._connection.disarm.call_args[1] + + +def test_device_disarm_success_without_panel_details(alarm_device, mocker): + """Should assume `userId` is not required if panel details are empty.""" + alarm_device._inventory = {} + mocker.spy(alarm_device._connection, "lock") + mocker.spy(alarm_device._connection, "disarm") + # Test + alarm_device._connection._session_id = "test" + alarm_device.disarm("1234", sectors=[4]) + assert alarm_device._connection.lock.call_count == 1 + assert alarm_device._connection.disarm.call_count == 1 + assert "1234" in alarm_device._connection.lock.call_args[0] + assert {"sectors": [4]} == alarm_device._connection.disarm.call_args[1] + + +def test_device_disarm_success_user_id_not_required(alarm_device, mocker): + """Should not split the code if the login with `userId` is not required.""" + alarm_device._inventory[0]["login_without_user_id"] = True + mocker.spy(alarm_device._connection, "lock") + mocker.spy(alarm_device._connection, "disarm") + # Test + alarm_device._connection._session_id = "test" + alarm_device.disarm("123456", sectors=[4]) + assert alarm_device._connection.lock.call_count == 1 + assert alarm_device._connection.disarm.call_count == 1 + assert "123456" in alarm_device._connection.lock.call_args[0] + assert {"user_id": None} == alarm_device._connection.lock.call_args[1] + assert {"sectors": [4]} == alarm_device._connection.disarm.call_args[1] + + +def test_device_disarm_success_with_user_id(alarm_device, mocker): + """Should split the code if the login with `userId` is required.""" + alarm_device._inventory[0]["login_without_user_id"] = False + mocker.spy(alarm_device._connection, "lock") + mocker.spy(alarm_device._connection, "disarm") + # Test + alarm_device._connection._session_id = "test" + alarm_device.disarm("001123456", sectors=[4]) + assert alarm_device._connection.lock.call_count == 1 + assert alarm_device._connection.disarm.call_count == 1 + assert "123456" in alarm_device._connection.lock.call_args[0] + assert {"user_id": "001"} == alarm_device._connection.lock.call_args[1] + assert {"sectors": [4]} == alarm_device._connection.disarm.call_args[1] + + +def test_device_disarm_code_error_with_user_id(alarm_device, mocker): + """Should raise an error if the code can't be split in `userId` and `code`.""" + alarm_device._inventory[0]["login_without_user_id"] = False + mocker.spy(alarm_device._connection, "lock") + mocker.spy(alarm_device._connection, "disarm") + # Test + alarm_device._connection._session_id = "test" + with pytest.raises(CodeError): + alarm_device.disarm("1234", sectors=[4]) def test_device_disarm_error(client, mocker): diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 58b84a1..830b38f 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -1,6 +1,8 @@ +import pytest +from elmo.api.exceptions import CodeError from homeassistant.core import valid_entity_id -from custom_components.econnect_metronet.helpers import generate_entity_id +from custom_components.econnect_metronet.helpers import generate_entity_id, split_code def test_generate_entity_name_empty(config_entry): @@ -47,3 +49,40 @@ def test_generate_entity_name_with_spaces(config_entry): entity_id = generate_entity_id(config_entry) assert entity_id == "econnect_metronet.econnect_metronet_home_assistant" assert valid_entity_id(entity_id) + + +def test_split_code_with_valid_digits(): + # Should split the numeric user ID and code correctly + code = "123456789012" + assert split_code(code) == ("123456", "789012") + + +def test_split_code_with_exact_six_chars_raises_error(): + # Should raise CodeError for code with less than 7 characters + code = "123456" + with pytest.raises(CodeError) as exc_info: + split_code(code) + assert "format without spaces" in str(exc_info.value) + + +def test_split_code_with_alphanumeric_user_id_raises_error(): + # Should raise CodeError for alphanumeric user ID + code = "USER123456" + with pytest.raises(CodeError) as exc_info: + split_code(code) + assert "user ID and code must be numbers" in str(exc_info.value) + + +def test_split_code_with_special_characters_raises_error(): + # Should raise CodeError for code with special characters + code = "12345@678901" + with pytest.raises(CodeError) as exc_info: + split_code(code) + assert "user ID and code must be numbers" in str(exc_info.value) + + +def test_split_code_with_empty_string_raises_error(): + # Should raise CodeError for empty string + with pytest.raises(CodeError) as exc_info: + split_code("") + assert "format without spaces" in str(exc_info.value)