Skip to content

Commit

Permalink
add possibility to reload actions
Browse files Browse the repository at this point in the history
  • Loading branch information
pszafer committed Sep 12, 2023
1 parent 6732733 commit 03145e2
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 47 deletions.
3 changes: 3 additions & 0 deletions boneio/helper/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ def is_topic_in_autodiscovery(self, topic: str) -> bool:
if topic in self._autodiscovery_messages[ha_type]:
return True
return False

def clear_autodiscovery_type(self, ha_type: str):
self._autodiscovery_messages[ha_type] = {}



Expand Down
8 changes: 8 additions & 0 deletions boneio/helper/gpio.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,15 @@ def __init__(
self._press_callback = press_callback
setup_input(pin=self._pin, pull_mode=gpio_mode)

def set_press_callback(self, press_callback: Callable[[ClickTypes, str, bool | None], None]) -> None:
self._press_callback = press_callback

@property
def is_pressed(self) -> bool:
"""Is button pressed."""
return read_input(self._pin)

@property
def pin(self) -> str:
"""Return configured pin."""
return self._pin
62 changes: 45 additions & 17 deletions boneio/helper/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,26 +342,40 @@ def configure_event_sensor(
pin: str,
press_callback: Callable,
send_ha_autodiscovery: Callable,
) -> str | None:
input: GpioEventButton | GpioEventButtonBeta | None = None
) -> GpioEventButton | GpioEventButtonBeta | None:
"""Configure input sensor or button."""
try:
GpioEventButtonClass = (
GpioEventButton
if gpio.get("detection_type", "stable") == "stable"
else GpioEventButtonBeta
)
GpioEventButtonClass(
pin=pin,
press_callback=lambda x, i, z: press_callback(
if input:
if not isinstance(input, GpioEventButtonClass):
_LOGGER.warn("You preconfigured type of input. It's forbidden. Please restart boneIO.")
return input
input.set_press_callback(press_callback=lambda x, i, z: press_callback(
x=x,
inpin=i,
actions=gpio.get(ACTIONS, {}).get(x, []),
input_type=INPUT,
empty_message_after=gpio.get("clear_message", False),
duration=z,
),
**gpio,
)
))
else:
input = GpioEventButtonClass(
pin=pin,
press_callback=lambda x, i, z: press_callback(
x=x,
inpin=i,
actions=gpio.get(ACTIONS, {}).get(x, []),
input_type=INPUT,
empty_message_after=gpio.get("clear_message", False),
duration=z,
),
**gpio,
)
if gpio.get(SHOW_HA, True):
send_ha_autodiscovery(
id=pin,
Expand All @@ -370,7 +384,7 @@ def configure_event_sensor(
device_class=gpio.get(DEVICE_CLASS, None),
availability_msg_func=ha_event_availabilty_message,
)
return pin
return input
except GPIOInputException as err:
_LOGGER.error("This PIN %s can't be configured. %s", pin, err)
pass
Expand All @@ -381,25 +395,39 @@ def configure_binary_sensor(
pin: str,
press_callback: Callable,
send_ha_autodiscovery: Callable,
) -> str | None:
input: GpioInputBinarySensor | GpioInputBinarySensorBeta | None = None
) -> GpioInputBinarySensor | GpioInputBinarySensorBeta | None:
"""Configure input sensor or button."""
try:
GpioInputBinarySensorClass = (
GpioInputBinarySensor
if gpio.get("detection_type", "stable") == "stable"
else GpioInputBinarySensorBeta
)
GpioInputBinarySensorClass(
pin=pin,
press_callback=lambda x, i: press_callback(
if input:
if not isinstance(input, GpioInputBinarySensorClass):
_LOGGER.warn("You preconfigured type of input. It's forbidden. Please restart boneIO.")
return input
input.set_press_callback(press_callback=lambda x, i, z: press_callback(
x=x,
inpin=i,
actions=gpio.get(ACTIONS, {}).get(x, []),
input_type=INPUT_SENSOR,
input_type=INPUT,
empty_message_after=gpio.get("clear_message", False),
),
**gpio,
)
duration=z,
))
else:
input = GpioInputBinarySensorClass(
pin=pin,
press_callback=lambda x, i: press_callback(
x=x,
inpin=i,
actions=gpio.get(ACTIONS, {}).get(x, []),
input_type=INPUT_SENSOR,
empty_message_after=gpio.get("clear_message", False),
),
**gpio,
)
if gpio.get(SHOW_HA, True):
send_ha_autodiscovery(
id=pin,
Expand All @@ -408,7 +436,7 @@ def configure_binary_sensor(
device_class=gpio.get(DEVICE_CLASS, None),
availability_msg_func=ha_binary_sensor_availabilty_message,
)
return pin
return input
except GPIOInputException as err:
_LOGGER.error("This PIN %s can't be configured. %s", pin, err)
pass
Expand Down
64 changes: 36 additions & 28 deletions boneio/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
from boneio.const import (
ACTION,
ADDRESS,
BINARY_SENSOR,
BUTTON,
CLOSE,
COVER,
DALLAS,
EVENT_ENTITY,
ID,
INPUT,
LM75,
Expand Down Expand Up @@ -116,6 +118,7 @@ def __init__(
self.send_message = send_message
self.stop_client = stop_client
self._event_pins = event_pins
self._inputs = {}
self._binary_pins = binary_pins
self._i2cbusio = I2C(SCL, SDA)
self._mcp = {}
Expand Down Expand Up @@ -251,35 +254,40 @@ def __init__(
_LOGGER.info("BoneIO manager is ready.")

def configure_inputs(self, reload_config: bool = False):
used_pins = set()
if reload_config:
load_config_from_file(self._config_file_path)
for gpio in self._event_pins:
"""Configure inputs. Either events or binary sensors."""
def check_if_pin_configured(pin: str) -> bool:
if pin in self._inputs:
if not reload_config:
_LOGGER.warn("This PIN %s is already configured. Omitting it.", pin)
return True
return False

def configure_single_input(configure_sensor_func, gpio):
pin = gpio.pop(PIN)
if pin in used_pins:
_LOGGER.warn("This PIN %s is already configured. Omitting it.", pin)
continue
used_pins.add(
configure_event_sensor(
gpio=gpio,
pin=pin,
press_callback=self.press_callback,
send_ha_autodiscovery=self.send_ha_autodiscovery,
)
if check_if_pin_configured(pin=pin):
return False
input = configure_sensor_func(
gpio=gpio,
pin=pin,
press_callback=self.press_callback,
send_ha_autodiscovery=self.send_ha_autodiscovery,
input=self._inputs.get(pin, None)
)
if input:
self._inputs[input.pin] = input
return True

if reload_config:
config = load_config_from_file(self._config_file_path)
if config:
self._event_pins = config.get(EVENT_ENTITY, [])
self._binary_pins = config.get(BINARY_SENSOR, [])
self._config_helper.clear_autodiscovery_type(ha_type=EVENT_ENTITY)
self._config_helper.clear_autodiscovery_type(ha_type=BINARY_SENSOR)
for gpio in self._event_pins:
configure_single_input(configure_sensor_func=configure_event_sensor, gpio=gpio)
for gpio in self._binary_pins:
pin = gpio.pop(PIN)
if pin in used_pins:
_LOGGER.warn("This PIN %s is already configured. Omitting it.", pin)
continue
used_pins.add(
configure_binary_sensor(
gpio=gpio,
pin=pin,
press_callback=self.press_callback,
send_ha_autodiscovery=self.send_ha_autodiscovery,
)
)
configure_single_input(configure_sensor_func=configure_binary_sensor, gpio=gpio)

def append_task(self, coro: Coroutine, name: str = "Unknown") -> asyncio.Future:
"""Add task to run with asyncio loop."""
Expand Down Expand Up @@ -465,7 +473,7 @@ def prepare_ha_buttons(self) -> None:
)
self.send_ha_autodiscovery(
id="inputs_reload",
name="Reload events and sensor",
name="Reload actions",
ha_type=BUTTON,
payload_press="inputs_reload",
availability_msg_func=ha_button_availabilty_message,
Expand Down Expand Up @@ -636,7 +644,7 @@ async def receive_message(self, topic: str, message: str) -> None:
elif device_id == "restart" and message == "restart":
_LOGGER.info("Exiting process. Systemd should restart it soon.")
await self.stop_client()
elif device_id == "inputs_reload" and message == "input_reload":
elif device_id == "inputs_reload" and message == "inputs_reload":
_LOGGER.info("Reloading events and binary sensors")
self.configure_inputs(reload_config=True)

Expand Down
2 changes: 1 addition & 1 deletion boneio/mqtt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ async def _subscribe_manager(self, manager: Manager) -> None:
await asyncio.gather(*tasks)

async def handle_messages(self, messages: Any, callback: Callable[[str, str], Awaitable[None]]):
"""Handle messages with callback or remove osbolete HA discovery messages."""
"""Handle messages with callback or remove obsolete HA discovery messages."""
async for message in messages:
payload = message.payload.decode()
callback_start = True
Expand Down
1 change: 0 additions & 1 deletion boneio/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from boneio.helper.config import ConfigHelper
from boneio.manager import Manager
from boneio.mqtt_client import MQTTClient
from boneio.helper.events import EventBus

_LOGGER = logging.getLogger(__name__)

Expand Down

0 comments on commit 03145e2

Please sign in to comment.