diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 25f438c..c93b9ad 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,10 +13,10 @@ repos: - id: trailing-whitespace -# - repo: https://github.com/astral-sh/ruff-pre-commit -# rev: v0.3.0 -# hooks: -# - id: ruff + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.3.4 + hooks: + - id: ruff - repo: https://github.com/pre-commit/pygrep-hooks rev: v1.10.0 diff --git a/docs/operations.rst b/docs/operations.rst index 2f6032c..78852cb 100644 --- a/docs/operations.rst +++ b/docs/operations.rst @@ -357,3 +357,31 @@ Example sequence: - factor: 0.1 - offset: -50 + + + +Examples +====================================== +.. py:currentmodule:: sml2mqtt.config.device + +Energy consumption today +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +This will report the power consumption of today + +.. + YamlModel: SmlValueConfig + +.. code-block:: yaml + + obis: '0100010800ff' # Obis code for the energy meter + mqtt: + topic: energy_today # MQTT topic for the meter + operations: + - type: meter + start now: true # Start immediately + reset times: # Reset at midnight + - 00:00 + - round: 1 + - type: change filter # Only report on changes + - refresh action: 01:00 # ... but refresh every hour diff --git a/src/sml2mqtt/__main__.py b/src/sml2mqtt/__main__.py index 33f2710..9f6a9dc 100644 --- a/src/sml2mqtt/__main__.py +++ b/src/sml2mqtt/__main__.py @@ -1,5 +1,5 @@ import asyncio -import platform +import os import sys import traceback @@ -53,7 +53,7 @@ async def a_main(): def main() -> int | str: # This is needed to make async-mqtt work # see https://github.com/sbtinstruments/asyncio-mqtt - if platform.system() == 'Windows': + if sys.platform.lower() == "win32" or os.name.lower() == "nt": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # Load config diff --git a/src/sml2mqtt/config/config.py b/src/sml2mqtt/config/config.py index 7d48c36..ceb7aa3 100644 --- a/src/sml2mqtt/config/config.py +++ b/src/sml2mqtt/config/config.py @@ -1,12 +1,10 @@ -from typing import Union - from easyconfig import AppBaseModel, BaseModel, create_app_config from pydantic import Field from .device import SmlDeviceConfig, SmlValueConfig +from .inputs import HttpSourceSettings, SerialSourceSettings from .logging import LoggingSettings from .mqtt import MqttConfig, OptionalMqttPublishConfig -from .inputs import HttpSourceSettings, SerialSourceSettings from .types import LowerStr, ObisHex diff --git a/src/sml2mqtt/config/mqtt.py b/src/sml2mqtt/config/mqtt.py index d9fe00e..d1c04ce 100644 --- a/src/sml2mqtt/config/mqtt.py +++ b/src/sml2mqtt/config/mqtt.py @@ -2,26 +2,25 @@ import string from easyconfig import BaseModel -from pydantic import Field, StrictBool, conint, constr, field_validator, model_validator +from pydantic import Field, StrictBool, field_validator, model_validator - -QOS = conint(ge=0, le=2) -TOPIC_STR = constr(strip_whitespace=True, min_length=1) -STRIPPED_STR = constr(strip_whitespace=True) +from .types import MqttQosInt, MqttTopicStr, StrippedStr class MqttDefaultPublishConfig(BaseModel): - qos: QOS = Field( - 0, description='Default value for QOS if no other QOS value in the config entry is set') + MqttQosInt: MqttQosInt = Field( + 0, description='Default value for MqttQosInt if no other MqttQosInt value in the config entry is set') retain: StrictBool = Field( False, description='Default value for retain if no other retain value in the config entry is set') class OptionalMqttPublishConfig(BaseModel): - topic: TOPIC_STR | None = Field(None, description='Topic fragment for building this topic with the parent topic') - full_topic: TOPIC_STR | None = Field( + topic: MqttTopicStr | None = Field( + None, description='Topic fragment for building this topic with the parent topic') + full_topic: MqttTopicStr | None = Field( None, alias='full topic', description='Full topic - will ignore the parent topic parts') - qos: QOS | None = Field(None, description='QoS for publishing this value (if set - otherwise use parent)') + MqttQosInt: MqttQosInt | None = Field( + None, description='MqttQosInt for publishing this value (if set - otherwise use parent)') retain: StrictBool | None = Field( None, description='Retain for publishing this value (if set - otherwise use parent)') @@ -47,18 +46,18 @@ def check_full_or_partial(self): class MqttConnection(BaseModel): - identifier: STRIPPED_STR = Field('sml2mqtt-' + ''.join(random.choices(string.ascii_letters, k=13)),) - host: STRIPPED_STR = 'localhost' - port: conint(gt=0) = 1883 - user: STRIPPED_STR = '' - password: STRIPPED_STR = '' + identifier: StrippedStr = Field('sml2mqtt-' + ''.join(random.choices(string.ascii_letters, k=13)),) + host: StrippedStr = 'localhost' + port: int = Field(1883, ge=0) + user: StrippedStr = '' + password: StrippedStr = '' tls: StrictBool = False tls_insecure: StrictBool = Field(False, alias='tls insecure') class MqttConfig(BaseModel): connection: MqttConnection = Field(default_factory=MqttConnection) - topic: TOPIC_STR = Field('sml2mqtt', alias='topic prefix') + topic: MqttTopicStr = Field('sml2mqtt', alias='topic prefix') defaults: MqttDefaultPublishConfig = Field(default_factory=MqttDefaultPublishConfig) last_will: OptionalMqttPublishConfig = Field( default_factory=lambda: OptionalMqttPublishConfig(topic='status'), alias='last will') diff --git a/src/sml2mqtt/config/types.py b/src/sml2mqtt/config/types.py index de6a77d..610916c 100644 --- a/src/sml2mqtt/config/types.py +++ b/src/sml2mqtt/config/types.py @@ -1,5 +1,6 @@ from typing import Annotated, TypeAlias +from annotated_types import Interval from pydantic import StrictFloat, StrictInt, StringConstraints @@ -17,3 +18,8 @@ Number: TypeAlias = StrictInt | StrictFloat PercentStr = Annotated[str, StringConstraints(strip_whitespace=True, pattern=r'^\d+\.?\d*\s*%$')] + +StrippedStr = Annotated[str, StringConstraints(strip_whitespace=True)] + +MqttTopicStr = Annotated[str, StringConstraints(strip_whitespace=True, min_length=1)] +MqttQosInt = Annotated[int, Interval(ge=0, le=2)] diff --git a/src/sml2mqtt/const/task.py b/src/sml2mqtt/const/task.py index b4ecc5f..8d06ec3 100644 --- a/src/sml2mqtt/const/task.py +++ b/src/sml2mqtt/const/task.py @@ -9,14 +9,14 @@ if TYPE_CHECKING: - from collections.abc import Callable, Coroutine + from collections.abc import Awaitable, Callable, Coroutine from sml2mqtt.const.protocols import DeviceProto TASKS: Final[set[asyncio_Task]] = set() -log = logging.getLogger('Tasks') +log = logging.getLogger('sml.tasks') def create_task(coro: Coroutine, *, name: str | None = None): @@ -29,23 +29,25 @@ def create_task(coro: Coroutine, *, name: str | None = None): async def wait_for_tasks(): while True: - for task in TASKS: + for task in TASKS.copy(): if not task.done(): # these are the raw tasks - # Exceptions are handled either in Task or DeviceTask so we ignore those here + # Exceptions are handled either in Task or DeviceTask, so we ignore those here try: await task except CancelledError: pass except Exception: - pass + log.exception('Uncaught error in Task!') break else: break + log.debug('All tasks done') + class Task: - def __init__(self, coro: Callable[[], Coroutine], *, name: str): + def __init__(self, coro: Callable[[], Awaitable], *, name: str): self._coro: Final = coro self._name: Final = name @@ -91,6 +93,8 @@ async def _wrapper(self): if task is self._task: self._task = None + log.debug(f'{self._name:s} finished!') + def process_exception(self, e: Exception): log.error(f'Error in {self._name:s}') for line in traceback.format_exc().splitlines(): diff --git a/src/sml2mqtt/mqtt/mqtt.py b/src/sml2mqtt/mqtt/mqtt.py index 790964e..1a7fc46 100644 --- a/src/sml2mqtt/mqtt/mqtt.py +++ b/src/sml2mqtt/mqtt/mqtt.py @@ -1,3 +1,4 @@ +import asyncio import traceback from asyncio import CancelledError, Event, Queue, TimeoutError, wait_for from typing import Final @@ -15,32 +16,20 @@ log = _parent_logger.getChild('mqtt') +TASK: Task | None = None IS_CONNECTED: Event | None = None async def start(): - global IS_CONNECTED + global IS_CONNECTED, TASK - assert not TASK.is_running + assert TASK is None IS_CONNECTED = Event() - TASK.start() - - on_shutdown(_shutdown, 'Shutdown mqtt') - - -async def _shutdown(): - await TASK.cancel_and_wait() - + TASK = Task(_mqtt_task, name='MQTT Task') -async def mqtt_task(): - try: - await _mqtt_task() - finally: - log.debug('Task finished') - - -TASK: Final = Task(mqtt_task, name='MQTT Task') + on_shutdown(TASK.cancel_and_wait, 'Shutdown mqtt') + TASK.start() async def wait_for_connect(timeout: float): @@ -113,6 +102,7 @@ async def _mqtt_task(): # Since we disconnect gracefully we have to manually sent the offline status await client.publish(will_topic.topic, payload_offline, will_topic.qos, will_topic.retain) shutdown = True + log.info('Disconnecting') except MqttError as e: delay.increase() diff --git a/src/sml2mqtt/mqtt/mqtt_obj.py b/src/sml2mqtt/mqtt/mqtt_obj.py index 7f6eb6b..8cbc561 100644 --- a/src/sml2mqtt/mqtt/mqtt_obj.py +++ b/src/sml2mqtt/mqtt/mqtt_obj.py @@ -26,8 +26,8 @@ def patch_analyze(): class MqttCfg: topic_full: str | None = None topic_fragment: str | None = None - qos: str | None = None - retain: str | None = None + qos: int | None = None + retain: bool | None = None def set_config(self, config: OptionalMqttPublishConfig | None): self.topic_full = config.full_topic @@ -90,7 +90,7 @@ def _merge_values(self) -> 'MqttObj': self.retain = self.parent.retain return self - def set_topic(self, topic: str) -> 'MqttObj': + def set_topic(self, topic: str | None) -> 'MqttObj': self.cfg.topic_fragment = topic self.update() return self diff --git a/src/sml2mqtt/runtime/shutdown.py b/src/sml2mqtt/runtime/shutdown.py index 66bac90..4f330a6 100644 --- a/src/sml2mqtt/runtime/shutdown.py +++ b/src/sml2mqtt/runtime/shutdown.py @@ -26,7 +26,7 @@ async def do(self): try: log.debug(self.msg) await self.coro() - log.debug('-> done!') + log.debug(f'{self.msg:s} done!') except Exception as e: log.error(str(e)) tb = traceback.format_exc().splitlines() diff --git a/src/sml2mqtt/sml_device/setup_device.py b/src/sml2mqtt/sml_device/setup_device.py index 11788eb..b510738 100644 --- a/src/sml2mqtt/sml_device/setup_device.py +++ b/src/sml2mqtt/sml_device/setup_device.py @@ -8,9 +8,12 @@ LimitValueOperation, OffsetOperation, OnChangeFilterOperation, + OrOperation, RefreshActionOperation, RoundOperation, - SkipZeroMeterOperation, VirtualMeterOperation, + SequenceOperation, + SkipZeroMeterOperation, + VirtualMeterOperation, ) from sml2mqtt.sml_value.setup_operations import setup_operations @@ -22,27 +25,45 @@ from sml2mqtt.config.device import SmlDeviceConfig from sml2mqtt.const import SmlFrameValues from sml2mqtt.sml_device import SmlDevice + from sml2mqtt.sml_value.base import OperationContainerBase, ValueOperationBase -def _create_default_transformations(sml_value: SmlValue, frame: SmlFrameValues, general_cfg: GeneralSettings): +def has_operation_type(obj: OperationContainerBase, *ops: type[ValueOperationBase], + is_of: bool = True) -> ValueOperationBase | None: + for op in obj.operations: + if isinstance(op, ops) == is_of: + return op + if (isinstance(op, (OrOperation, SequenceOperation)) and + (ret := has_operation_type(op, is_of=is_of)) is not None): + return ret + return None + + +def _create_default_transformations(log: logging.Logger, sml_value: SmlValue, frame: SmlFrameValues, + general_cfg: GeneralSettings): + + op_count = len(sml_value.operations) + if (entry := frame.get_value(sml_value.obis)) is not None and entry.unit == 30: if general_cfg.wh_in_kwh: - sml_value.insert_operation(FactorOperation(1 / 1000)) - - if not general_cfg.report_blank_energy_meters: - for op in sml_value.operations: - if isinstance(op, VirtualMeterOperation): - break + if op := has_operation_type(sml_value, FactorOperation): + log.debug(f'Found {op.__class__.__name__:s} - skip creating default factor') else: - sml_value.insert_operation(SkipZeroMeterOperation()) + sml_value.insert_operation(FactorOperation(1 / 1000)) + + # If the user has created something for the meter we don't skip it + if not op_count: + sml_value.insert_operation(SkipZeroMeterOperation()) def _create_default_filters(log: logging.Logger, sml_value: SmlValue, general_cfg: GeneralSettings): - for op in sml_value.operations: - if not isinstance(op, ( - FactorOperation, OffsetOperation, RoundOperation, LimitValueOperation, SkipZeroMeterOperation)): - log.debug(f'Found {op.__class__.__name__:s} - skip creating of default filters') - return None + if op := has_operation_type( + sml_value, + FactorOperation, OffsetOperation, RoundOperation, LimitValueOperation, SkipZeroMeterOperation, + is_of=False + ): + log.debug(f'Found {op.__class__.__name__:s} - skip creating default filters') + return None log.info(f'No filters found for {sml_value.obis}, creating default filters') @@ -84,7 +105,7 @@ def setup_device(device: SmlDevice, frame: SmlFrameValues, cfg: SmlDeviceConfig device.sml_values.add_value(sml_value) setup_operations(sml_value, value_cfg) - _create_default_transformations(sml_value, frame, general_cfg) + _create_default_transformations(device.log, sml_value, frame, general_cfg) _create_default_filters(device.log, sml_value, general_cfg) else: # No config found -> ignore defaults @@ -95,5 +116,5 @@ def setup_device(device: SmlDevice, frame: SmlFrameValues, cfg: SmlDeviceConfig sml_value = SmlValue(obis, mqtt_device.create_child(topic_fragment=obis)) device.sml_values.add_value(sml_value) - _create_default_transformations(sml_value, frame, general_cfg) + _create_default_transformations(device.log, sml_value, frame, general_cfg) _create_default_filters(device.log, sml_value, general_cfg) diff --git a/src/sml2mqtt/sml_source/serial.py b/src/sml2mqtt/sml_source/serial.py index 45d4cc9..5b9fb01 100644 --- a/src/sml2mqtt/sml_source/serial.py +++ b/src/sml2mqtt/sml_source/serial.py @@ -3,6 +3,7 @@ import asyncio import logging from asyncio import Protocol +from time import monotonic from typing import TYPE_CHECKING, Final from serial_asyncio import SerialTransport, create_serial_connection @@ -42,6 +43,8 @@ def __init__(self, device: DeviceProto, url: str) -> None: self._task: Final = DeviceTask(device, self._chunk_task, name=f'Serial Task {self.device.name:s}') + self.last_read: float | None = 0.0 + def start(self): self._task.start() @@ -68,6 +71,8 @@ def connection_lost(self, exc: Exception | None) -> None: def data_received(self, data: bytes): self.transport.pause_reading() + + self.last_read = monotonic() self.device.on_source_data(data) async def _chunk_task(self): @@ -76,5 +81,11 @@ async def _chunk_task(self): while True: await asyncio.sleep(interval) + if self.last_read is not None: + diff_to_interval = interval - (monotonic() - self.last_read) + self.last_read = None + if diff_to_interval >= 0.001: + await asyncio.sleep(diff_to_interval) + # safe to be called multiple times in a row self.transport.resume_reading() diff --git a/src/sml2mqtt/sml_value/operations/operations.py b/src/sml2mqtt/sml_value/operations/operations.py index eb37abc..09ffacd 100644 --- a/src/sml2mqtt/sml_value/operations/operations.py +++ b/src/sml2mqtt/sml_value/operations/operations.py @@ -8,7 +8,7 @@ class OrOperation(ValueOperationBase, OperationContainerBase): @override - def process_value(self, value: float, info: SmlValueInfo) -> float | None: + def process_value(self, value: float | None, info: SmlValueInfo) -> float | None: ret: float | None = None for op in self.operations: if (call := op.process_value(value, info)) is not None and ret is None: @@ -28,7 +28,7 @@ def describe(self, indent: str = '') -> Generator[str, None, None]: class SequenceOperation(ValueOperationBase, OperationContainerBase): @override - def process_value(self, value: float, info: SmlValueInfo) -> float | None: + def process_value(self, value: float | None, info: SmlValueInfo) -> float | None: for op in self.operations: value = op.process_value(value, info) return value diff --git a/tests/test_docs.py b/tests/test_docs.py index 1079ffb..0e66604 100644 --- a/tests/test_docs.py +++ b/tests/test_docs.py @@ -9,8 +9,8 @@ from pydantic import BaseModel import sml2mqtt -from sml2mqtt.config.operations import HasDateTimeFields, HasIntervalFields from sml2mqtt.config.inputs import SmlSourceSettingsBase +from sml2mqtt.config.operations import HasDateTimeFields, HasIntervalFields from sml2mqtt.sml_value.setup_operations import MAPPING, setup_operations