Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
spacemanspiff2007 committed Mar 25, 2024
1 parent 0f9cfa7 commit ecc9bb1
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 72 deletions.
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ repos:
- id: trailing-whitespace


# - repo: https://github.com/astral-sh/ruff-pre-commit
# rev: v0.3.0
# hooks:
# - id: ruff
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.3.4
hooks:
- id: ruff

- repo: https://github.com/pre-commit/pygrep-hooks
rev: v1.10.0
Expand Down
28 changes: 28 additions & 0 deletions docs/operations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,31 @@ Example
sequence:
- factor: 0.1
- offset: -50
Examples
======================================
.. py:currentmodule:: sml2mqtt.config.device
Energy consumption today
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

This will report the power consumption of today

..
YamlModel: SmlValueConfig
.. code-block:: yaml
obis: '0100010800ff' # Obis code for the energy meter
mqtt:
topic: energy_today # MQTT topic for the meter
operations:
- type: meter
start now: true # Start immediately
reset times: # Reset at midnight
- 00:00
- round: 1
- type: change filter # Only report on changes
- refresh action: 01:00 # ... but refresh every hour
4 changes: 2 additions & 2 deletions src/sml2mqtt/__main__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
import platform
import os
import sys
import traceback

Expand Down Expand Up @@ -53,7 +53,7 @@ async def a_main():
def main() -> int | str:
# This is needed to make async-mqtt work
# see https://github.com/sbtinstruments/asyncio-mqtt
if platform.system() == 'Windows':
if sys.platform.lower() == "win32" or os.name.lower() == "nt":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

# Load config
Expand Down
4 changes: 1 addition & 3 deletions src/sml2mqtt/config/config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from typing import Union

from easyconfig import AppBaseModel, BaseModel, create_app_config
from pydantic import Field

from .device import SmlDeviceConfig, SmlValueConfig
from .inputs import HttpSourceSettings, SerialSourceSettings
from .logging import LoggingSettings
from .mqtt import MqttConfig, OptionalMqttPublishConfig
from .inputs import HttpSourceSettings, SerialSourceSettings
from .types import LowerStr, ObisHex


Expand Down
31 changes: 15 additions & 16 deletions src/sml2mqtt/config/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,25 @@
import string

from easyconfig import BaseModel
from pydantic import Field, StrictBool, conint, constr, field_validator, model_validator
from pydantic import Field, StrictBool, field_validator, model_validator


QOS = conint(ge=0, le=2)
TOPIC_STR = constr(strip_whitespace=True, min_length=1)
STRIPPED_STR = constr(strip_whitespace=True)
from .types import MqttQosInt, MqttTopicStr, StrippedStr


class MqttDefaultPublishConfig(BaseModel):
qos: QOS = Field(
0, description='Default value for QOS if no other QOS value in the config entry is set')
MqttQosInt: MqttQosInt = Field(
0, description='Default value for MqttQosInt if no other MqttQosInt value in the config entry is set')
retain: StrictBool = Field(
False, description='Default value for retain if no other retain value in the config entry is set')


class OptionalMqttPublishConfig(BaseModel):
topic: TOPIC_STR | None = Field(None, description='Topic fragment for building this topic with the parent topic')
full_topic: TOPIC_STR | None = Field(
topic: MqttTopicStr | None = Field(
None, description='Topic fragment for building this topic with the parent topic')
full_topic: MqttTopicStr | None = Field(
None, alias='full topic', description='Full topic - will ignore the parent topic parts')
qos: QOS | None = Field(None, description='QoS for publishing this value (if set - otherwise use parent)')
MqttQosInt: MqttQosInt | None = Field(
None, description='MqttQosInt for publishing this value (if set - otherwise use parent)')
retain: StrictBool | None = Field(
None, description='Retain for publishing this value (if set - otherwise use parent)')

Expand All @@ -47,18 +46,18 @@ def check_full_or_partial(self):


class MqttConnection(BaseModel):
identifier: STRIPPED_STR = Field('sml2mqtt-' + ''.join(random.choices(string.ascii_letters, k=13)),)
host: STRIPPED_STR = 'localhost'
port: conint(gt=0) = 1883
user: STRIPPED_STR = ''
password: STRIPPED_STR = ''
identifier: StrippedStr = Field('sml2mqtt-' + ''.join(random.choices(string.ascii_letters, k=13)),)
host: StrippedStr = 'localhost'
port: int = Field(1883, ge=0)
user: StrippedStr = ''
password: StrippedStr = ''
tls: StrictBool = False
tls_insecure: StrictBool = Field(False, alias='tls insecure')


class MqttConfig(BaseModel):
connection: MqttConnection = Field(default_factory=MqttConnection)
topic: TOPIC_STR = Field('sml2mqtt', alias='topic prefix')
topic: MqttTopicStr = Field('sml2mqtt', alias='topic prefix')
defaults: MqttDefaultPublishConfig = Field(default_factory=MqttDefaultPublishConfig)
last_will: OptionalMqttPublishConfig = Field(
default_factory=lambda: OptionalMqttPublishConfig(topic='status'), alias='last will')
6 changes: 6 additions & 0 deletions src/sml2mqtt/config/types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Annotated, TypeAlias

from annotated_types import Interval
from pydantic import StrictFloat, StrictInt, StringConstraints


Expand All @@ -17,3 +18,8 @@
Number: TypeAlias = StrictInt | StrictFloat

PercentStr = Annotated[str, StringConstraints(strip_whitespace=True, pattern=r'^\d+\.?\d*\s*%$')]

StrippedStr = Annotated[str, StringConstraints(strip_whitespace=True)]

MqttTopicStr = Annotated[str, StringConstraints(strip_whitespace=True, min_length=1)]
MqttQosInt = Annotated[int, Interval(ge=0, le=2)]
16 changes: 10 additions & 6 deletions src/sml2mqtt/const/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@


if TYPE_CHECKING:
from collections.abc import Callable, Coroutine
from collections.abc import Awaitable, Callable, Coroutine

from sml2mqtt.const.protocols import DeviceProto


TASKS: Final[set[asyncio_Task]] = set()

log = logging.getLogger('Tasks')
log = logging.getLogger('sml.tasks')


def create_task(coro: Coroutine, *, name: str | None = None):
Expand All @@ -29,23 +29,25 @@ def create_task(coro: Coroutine, *, name: str | None = None):

async def wait_for_tasks():
while True:
for task in TASKS:
for task in TASKS.copy():
if not task.done():
# these are the raw tasks
# Exceptions are handled either in Task or DeviceTask so we ignore those here
# Exceptions are handled either in Task or DeviceTask, so we ignore those here
try:
await task
except CancelledError:
pass
except Exception:
pass
log.exception('Uncaught error in Task!')
break
else:
break

log.debug('All tasks done')


class Task:
def __init__(self, coro: Callable[[], Coroutine], *, name: str):
def __init__(self, coro: Callable[[], Awaitable], *, name: str):
self._coro: Final = coro
self._name: Final = name

Expand Down Expand Up @@ -91,6 +93,8 @@ async def _wrapper(self):
if task is self._task:
self._task = None

log.debug(f'{self._name:s} finished!')

def process_exception(self, e: Exception):
log.error(f'Error in {self._name:s}')
for line in traceback.format_exc().splitlines():
Expand Down
26 changes: 8 additions & 18 deletions src/sml2mqtt/mqtt/mqtt.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import traceback
from asyncio import CancelledError, Event, Queue, TimeoutError, wait_for
from typing import Final
Expand All @@ -15,32 +16,20 @@
log = _parent_logger.getChild('mqtt')


TASK: Task | None = None
IS_CONNECTED: Event | None = None


async def start():
global IS_CONNECTED
global IS_CONNECTED, TASK

assert not TASK.is_running
assert TASK is None

IS_CONNECTED = Event()
TASK.start()

on_shutdown(_shutdown, 'Shutdown mqtt')


async def _shutdown():
await TASK.cancel_and_wait()

TASK = Task(_mqtt_task, name='MQTT Task')

async def mqtt_task():
try:
await _mqtt_task()
finally:
log.debug('Task finished')


TASK: Final = Task(mqtt_task, name='MQTT Task')
on_shutdown(TASK.cancel_and_wait, 'Shutdown mqtt')
TASK.start()


async def wait_for_connect(timeout: float):
Expand Down Expand Up @@ -113,6 +102,7 @@ async def _mqtt_task():
# Since we disconnect gracefully we have to manually sent the offline status
await client.publish(will_topic.topic, payload_offline, will_topic.qos, will_topic.retain)
shutdown = True
log.info('Disconnecting')

except MqttError as e:
delay.increase()
Expand Down
6 changes: 3 additions & 3 deletions src/sml2mqtt/mqtt/mqtt_obj.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ def patch_analyze():
class MqttCfg:
topic_full: str | None = None
topic_fragment: str | None = None
qos: str | None = None
retain: str | None = None
qos: int | None = None
retain: bool | None = None

def set_config(self, config: OptionalMqttPublishConfig | None):
self.topic_full = config.full_topic
Expand Down Expand Up @@ -90,7 +90,7 @@ def _merge_values(self) -> 'MqttObj':
self.retain = self.parent.retain
return self

def set_topic(self, topic: str) -> 'MqttObj':
def set_topic(self, topic: str | None) -> 'MqttObj':
self.cfg.topic_fragment = topic
self.update()
return self
Expand Down
2 changes: 1 addition & 1 deletion src/sml2mqtt/runtime/shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async def do(self):
try:
log.debug(self.msg)
await self.coro()
log.debug('-> done!')
log.debug(f'{self.msg:s} done!')
except Exception as e:
log.error(str(e))
tb = traceback.format_exc().splitlines()
Expand Down
53 changes: 37 additions & 16 deletions src/sml2mqtt/sml_device/setup_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
LimitValueOperation,
OffsetOperation,
OnChangeFilterOperation,
OrOperation,
RefreshActionOperation,
RoundOperation,
SkipZeroMeterOperation, VirtualMeterOperation,
SequenceOperation,
SkipZeroMeterOperation,
VirtualMeterOperation,
)
from sml2mqtt.sml_value.setup_operations import setup_operations

Expand All @@ -22,27 +25,45 @@
from sml2mqtt.config.device import SmlDeviceConfig
from sml2mqtt.const import SmlFrameValues
from sml2mqtt.sml_device import SmlDevice
from sml2mqtt.sml_value.base import OperationContainerBase, ValueOperationBase


def _create_default_transformations(sml_value: SmlValue, frame: SmlFrameValues, general_cfg: GeneralSettings):
def has_operation_type(obj: OperationContainerBase, *ops: type[ValueOperationBase],
is_of: bool = True) -> ValueOperationBase | None:
for op in obj.operations:
if isinstance(op, ops) == is_of:
return op
if (isinstance(op, (OrOperation, SequenceOperation)) and
(ret := has_operation_type(op, is_of=is_of)) is not None):
return ret
return None


def _create_default_transformations(log: logging.Logger, sml_value: SmlValue, frame: SmlFrameValues,
general_cfg: GeneralSettings):

op_count = len(sml_value.operations)

if (entry := frame.get_value(sml_value.obis)) is not None and entry.unit == 30:
if general_cfg.wh_in_kwh:
sml_value.insert_operation(FactorOperation(1 / 1000))

if not general_cfg.report_blank_energy_meters:
for op in sml_value.operations:
if isinstance(op, VirtualMeterOperation):
break
if op := has_operation_type(sml_value, FactorOperation):
log.debug(f'Found {op.__class__.__name__:s} - skip creating default factor')
else:
sml_value.insert_operation(SkipZeroMeterOperation())
sml_value.insert_operation(FactorOperation(1 / 1000))

# If the user has created something for the meter we don't skip it
if not op_count:
sml_value.insert_operation(SkipZeroMeterOperation())


def _create_default_filters(log: logging.Logger, sml_value: SmlValue, general_cfg: GeneralSettings):
for op in sml_value.operations:
if not isinstance(op, (
FactorOperation, OffsetOperation, RoundOperation, LimitValueOperation, SkipZeroMeterOperation)):
log.debug(f'Found {op.__class__.__name__:s} - skip creating of default filters')
return None
if op := has_operation_type(
sml_value,
FactorOperation, OffsetOperation, RoundOperation, LimitValueOperation, SkipZeroMeterOperation,
is_of=False
):
log.debug(f'Found {op.__class__.__name__:s} - skip creating default filters')
return None

log.info(f'No filters found for {sml_value.obis}, creating default filters')

Expand Down Expand Up @@ -84,7 +105,7 @@ def setup_device(device: SmlDevice, frame: SmlFrameValues, cfg: SmlDeviceConfig
device.sml_values.add_value(sml_value)

setup_operations(sml_value, value_cfg)
_create_default_transformations(sml_value, frame, general_cfg)
_create_default_transformations(device.log, sml_value, frame, general_cfg)
_create_default_filters(device.log, sml_value, general_cfg)
else:
# No config found -> ignore defaults
Expand All @@ -95,5 +116,5 @@ def setup_device(device: SmlDevice, frame: SmlFrameValues, cfg: SmlDeviceConfig
sml_value = SmlValue(obis, mqtt_device.create_child(topic_fragment=obis))
device.sml_values.add_value(sml_value)

_create_default_transformations(sml_value, frame, general_cfg)
_create_default_transformations(device.log, sml_value, frame, general_cfg)
_create_default_filters(device.log, sml_value, general_cfg)
Loading

0 comments on commit ecc9bb1

Please sign in to comment.