Skip to content

Commit

Permalink
Fixed target originator for notifications sent via MQTT.
Browse files Browse the repository at this point in the history
  • Loading branch information
ankraft committed Jan 22, 2023
1 parent 7c1ac53 commit c55d4ba
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 35 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## [0.11.2] - 2023-01-23

### Fixed
- [MQTT] Fixed target originator for notifications sent via MQTT.


## [0.11.1] - 2022-12-16

### Added
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# ACME oneM2M CSE
An open source CSE Middleware for Education.

Version 0.11.1
Version 0.11.2

[![oneM2M](https://img.shields.io/badge/oneM2M-f00)](https://www.onem2m.org) [![Python](https://img.shields.io/badge/Python-3.8-blue)](https://www.python.org) [![Maintenance](https://img.shields.io/badge/Maintained-Yes-green.svg)](https://github.com/ankraft/ACME-oneM2M-CSE/graphs/commit-activity) [![License](https://img.shields.io/badge/License-BSD%203--Clause-green)](LICENSE) [![MyPy](https://img.shields.io/badge/MyPy-covered-green)](LICENSE)
[![Twitter](https://img.shields.io/twitter/url/https/twitter.com/acmeCSE.svg?style=social&label=%40acmeCSE)](https://twitter.com/acmeCSE)
Expand Down
2 changes: 1 addition & 1 deletion acme/etc/Constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class Constants(object):
""" Various CSE and oneM2M constants """

version = '0.11.1'
version = '0.11.2'
""" ACME's release version """

textLogo = '[dim][[/dim][red][i]ACME[/i][/red][dim]][/dim]'
Expand Down
146 changes: 113 additions & 33 deletions acme/services/MQTTClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
"""

from __future__ import annotations
from typing import Tuple, cast, Dict, Optional
from typing import Tuple, cast, Dict, Optional, Any

from urllib.parse import urlparse
from threading import Lock

from ..etc.Types import JSON, Operation, CSERequest, ContentSerializationType, RequestType, ResourceTypes, Result, ResponseStatusCode, ResourceTypes
from ..etc import Utils as Utils, DateUtils, RequestUtils
from ..etc.RequestUtils import requestFromResult, serializeData
from ..etc.DateUtils import getResourceDate, waitFor
from ..etc.Utils import exceptionToResult, uniqueRI, toSPRelative, renameThread
from ..helpers.MQTTConnection import MQTTConnection, MQTTHandler, idToMQTT, idToMQTTClientID
from ..helpers import TextTools
from ..services.Configuration import Configuration
Expand All @@ -31,12 +33,40 @@ class MQTTClientHandler(MQTTHandler):
topicPrefixCont: Count of elements in the prefix.
"""

__slots__ = (
'mqttClient',
'topicPrefix',
'topicPrefixCount',
'operationEvents',

'_eventMqttCreate',
'_eventMqttRetrieve',
'_eventMqttUpdate',
'_eventMqttDelete',
'_eventMqttNotify',
)

def __init__(self, mqttClient:MQTTClient) -> None:
super().__init__()
self.mqttClient = mqttClient
self.topicPrefix = mqttClient.topicPrefix
self.topicPrefixCount = len(self.topicPrefix.split('/')) # Count the elements for the prefix

# Optimize event handling
self._eventMqttCreate = CSE.event.mqttCreate # type: ignore [attr-defined]
self._eventMqttRetrieve = CSE.event.mqttRetrieve # type: ignore [attr-defined]
self._eventMqttUpdate = CSE.event.mqttUpdate # type: ignore [attr-defined]
self._eventMqttDelete = CSE.event.mqttDelete # type: ignore [attr-defined]
self._eventMqttNotify = CSE.event.mqttNotify # type: ignore [attr-defined]

self.operationEvents = {
Operation.CREATE: [self._eventMqttCreate, 'MQCR'],
Operation.RETRIEVE: [self._eventMqttRetrieve, 'MQRE'],
Operation.UPDATE: [self._eventMqttUpdate, 'MQUP'],
Operation.DELETE: [self._eventMqttDelete, 'MQDE'],
Operation.NOTIFY: [self._eventMqttNotify, 'MQNO'],
}


def onConnect(self, connection:MQTTConnection) -> bool:
""" When connected to a broker then register the topics the CSE listens to.
Expand Down Expand Up @@ -122,6 +152,7 @@ def _responseCB(self, connection:MQTTConnection, topic:str, data:bytes) -> None:
# Dissect Body
contentType:str = ts[-1]
if not (dissectResult := CSE.request.dissectRequestFromBytes(data, contentType, isResponse=True)).status:
L.isWarn and L.logWarn(f'Error receiving MQTT response: {dissectResult.dbg}')
return

# Add it to a response queue in the manager
Expand Down Expand Up @@ -220,23 +251,19 @@ def _logRequest(result:Result) -> None:
if self.mqttClient.isStopped:
_sendResponse(Result.errorResult(rsc = ResponseStatusCode.internalServerError, request = dissectResult.request, dbg = 'mqtt server not running'))
return



# send events for the MQTT operations
# TODO rename current thread similar to http requests
if request.op == Operation.CREATE:
CSE.event.mqttCreate() # type: ignore [attr-defined]
elif request.op == Operation.RETRIEVE:
CSE.event.mqttRetrieve() # type: ignore [attr-defined]
elif request.op == Operation.UPDATE:
CSE.event.mqttUpdate() # type: ignore [attr-defined]
elif request.op == Operation.DELETE:
CSE.event.mqttDelete() # type: ignore [attr-defined]
elif request.op == Operation.NOTIFY:
CSE.event.mqttNotify() # type: ignore [attr-defined]
_t = self.operationEvents[request.op]
_t[0]()

# rename threads
renameThread(_t[1])

try:
responseResult = CSE.request.handleRequest(request)
except Exception as e:
responseResult = Utils.exceptionToResult(e)
responseResult = exceptionToResult(e)
# Send response

# TODO Also change in http
Expand All @@ -246,7 +273,7 @@ def _logRequest(result:Result) -> None:

# Add Originating Timestamp if present in request
if request.ot:
responseResult.request.ot = DateUtils.getResourceDate()
responseResult.request.ot = getResourceDate()

# Transform request to oneM2M request
_sendResponse(responseResult)
Expand All @@ -258,13 +285,31 @@ def _logRequest(result:Result) -> None:
class MQTTClient(object):
""" The general MQTT manager for this CSE.
"""
# TODO doc

__slots__ = (
'mqttConnection',
'isStopped',
'topicsCount',
'mqttConnections',
'receivedResponses',
'receivedResponsesLock',

'enable',
'topicPrefix',
'requestTimeout',
)

# TODO move config handling to event handler

def __init__(self) -> None:
self.enable = Configuration.get('mqtt.enable')
self.topicPrefix = Configuration.get('mqtt.topicPrefix')
self.requestTimeout = Configuration.get('mqtt.timeout')
self.mqttConnection = None

# Get the configuration settings
self._assignConfig()

# Add a handler for configuration changes
CSE.event.addHandler(CSE.event.configUpdate, self.configUpdate) # type: ignore

self.isStopped = False
self.topicsCount = 0
self.mqttConnections:Dict[Tuple[str, int], MQTTConnection] = {}
Expand All @@ -286,6 +331,7 @@ def run(self) -> bool:
if not self.enable or not self.mqttConnection:
L.isInfo and L.log('MQTT: client NOT enabled')
return True
L.isInfo and L.log('Start MQTT client')
self.mqttConnection.run()
if not self.isFullySubscribed(): # This waits until the MQTT Client connects and fully subscribes (until a timeout)
return False
Expand All @@ -295,13 +341,44 @@ def run(self) -> bool:
def shutdown(self) -> bool:
""" Shutdown the MQTTClient.
"""
L.isInfo and L.log('Shutdown MQTT client')
self.isStopped = True
for id in list(self.mqttConnections):
self.disconnectFromMqttBroker(id[0], id[1]) # 0 = address, 1 = port
self.mqttConnection = None
return True


def _assignConfig(self) -> None:
""" Store relevant configuration values in the manager.
"""
self.enable = Configuration.get('mqtt.enable')
self.topicPrefix = Configuration.get('mqtt.topicPrefix')
self.requestTimeout = Configuration.get('mqtt.timeout')


def configUpdate(self, key:Optional[str] = None,
value:Optional[Any] = None) -> None:
""" Callback for the `configUpdate` event.
Args:
key: Name of the updated configuration setting.
value: New value for the config setting.
"""
if key not in [ 'mqtt.enable',
'mqtt.topicPrefix',
'mqtt.timeout',
]:
return

# assign new values
self._assignConfig()

# possibly restart MQTT client
self.shutdown()
self.run()


def pause(self) -> None:
""" Stop handling requests.
"""
Expand All @@ -323,14 +400,14 @@ def unpause(self) -> None:
def isFullySubscribed(self) -> bool:
""" Check whether this mqttConnection is fully subscribed.
"""
return DateUtils.waitFor(self.requestTimeout, lambda:self.mqttConnection.isConnected and self.mqttConnection.subscribedCount == 3) # currently 3 topics
return waitFor(self.requestTimeout, lambda:self.mqttConnection.isConnected and self.mqttConnection.subscribedCount == 3) # currently 3 topics


def isConnected(self) -> bool:
""" Check whether the MQTT client is connected to a broker. Wait for a moment
to take startup connection into account.
"""
return DateUtils.waitFor(self.requestTimeout, lambda:self.mqttConnection.isConnected)
return waitFor(self.requestTimeout, lambda:self.mqttConnection.isConnected)


def connectToMqttBroker(self, address:str, port:int, useTLS:bool, username:str, password:str) -> Optional[MQTTConnection]:
Expand Down Expand Up @@ -383,7 +460,7 @@ def getMqttBroker(self, address:str, port:int) -> MQTTConnection:
def sendMqttRequest(self,
operation:Operation,
url:str, originator:str,
to:str = None, # TODO
to:str = None,
ty:ResourceTypes = None,
content:JSON = None,
parameters:CSERequest = None,
Expand All @@ -409,14 +486,14 @@ def sendMqttRequest(self,
# Pack everything that is needed in a Result object as if this is a normal "response" (for MQTT this doesn't matter)
# This seems to be a bit complicated, but we fill in the necessary values as if this is a normal "response"
req = Result(request = CSERequest())
req.request.id = u.path[1:]
req.request.id = u.path[1:] if u.path[1:] else to
req.request.op = operation
req.resource = content
req.request.originator = originator
req.request.rqi = Utils.uniqueRI()
req.request.rqi = uniqueRI()
if rvi != '1':
req.request.rvi = rvi if rvi is not None else CSE.releaseVersion
req.request.ot = DateUtils.getResourceDate()
req.request.ot = getResourceDate()
req.rsc = ResponseStatusCode.UNKNOWN # explicitly remove the provided OK because we don't want have any
req.request.ct = ct if ct else CSE.defaultSerialization # get the serialization

Expand All @@ -433,7 +510,8 @@ def sendMqttRequest(self,

# Build the topic
if not len(topic):
topic = f'/oneM2M/req/{idToMQTT(CSE.cseCsi)}/{idToMQTT(Utils.toSPRelative(originator))}/{ct.name.lower()}'
topic = f'/oneM2M/req/{idToMQTT(CSE.cseCsi)}/{idToMQTT(toSPRelative(to if to else originator))}/{ct.name.lower()}'
#topic = f'/oneM2M/req/{idToMQTT(CSE.cseCsi)}/{idToMQTT(toSPRelative(originator))}/{ct.name.lower()}'
elif topic.startswith('///'):
topic = f'/oneM2M/req/{idToMQTT(CSE.cseCsi)}/{idToMQTT(pathSplit[3])}/{ct.name.lower()}' # TODO Investigate whether this needs to be SP-Relative as well
elif topic.startswith('//'):
Expand All @@ -453,7 +531,7 @@ def sendMqttRequest(self,
password = mqttPassword)

# Wait a moment until we are connected.
DateUtils.waitFor(self.requestTimeout, lambda: mqttConnection is not None and mqttConnection.isConnected)
waitFor(self.requestTimeout, lambda: mqttConnection is not None and mqttConnection.isConnected)

# We are not connected, so -> fail
if not mqttConnection or not mqttConnection.isConnected:
Expand Down Expand Up @@ -486,12 +564,14 @@ def waitForResponse(self, rqi:str, timeOut:float) -> Tuple[ Result, str ]:
def _receivedResponse() -> bool:
nonlocal resp, topic
with self.receivedResponsesLock:
if not self.receivedResponses:
return False
if rqi in self.receivedResponses:
resp, topic = self.receivedResponses.pop(rqi) # return the response (in a Result object), and remove it from the dict.
return True
return False

if not DateUtils.waitFor(timeOut, _receivedResponse):
if not waitFor(timeOut, _receivedResponse):
return Result.errorResult(rsc = ResponseStatusCode.targetNotReachable, dbg = 'Target not reachable or timeout'), None
CSE.event.responseReceived(resp.request) # type:ignore [attr-defined]
return resp, topic
Expand All @@ -510,7 +590,7 @@ def prepareMqttRequest(inResult:Result,
The constructed and serialized content is returned in a tuple in `Result.data`: the content as a dictionary and the serialized content.
"""
result = RequestUtils.requestFromResult(inResult, originator, ty, op = op, isResponse = isResponse)
result = requestFromResult(inResult, originator, ty, op = op, isResponse = isResponse)

# When raw: Replace the data with its own primitive content, and a couple of headers
if raw and (pc := cast(JSON, result.data).get('pc')):
Expand All @@ -522,9 +602,9 @@ def prepareMqttRequest(inResult:Result,

# Always add the original timestamp in a response
if not result.request.ot:
result.request.ot = DateUtils.getResourceDate()
result.request.ot = getResourceDate()

result.data = (result.data, cast(bytes, RequestUtils.serializeData(cast(JSON, result.data), result.request.ct)))
result.data = (result.data, cast(bytes, serializeData(cast(JSON, result.data), result.request.ct)))
return result


Expand Down Expand Up @@ -564,4 +644,4 @@ def logRequest(reqResult:Result,

body = f'\nBody: {bodyPrint}'

L.isDebug and L.logDebug(f'{prefix}: {topic}{body}')
L.isDebug and L.logDebug(f'{prefix}: {topic}{body}', stackOffset = 1)

0 comments on commit c55d4ba

Please sign in to comment.