Skip to content

Commit

Permalink
Flexible formatting for MQTT Payloads, add Current support.
Browse files Browse the repository at this point in the history
Issue matthewwall#8 requested support for timestamps in the values sent to MQTT.
This checkin adds support for Plaintext and JSON style encodings of the
measurement timestamp data.

Add support for Current values if the underlying system supports it.

Signed-off-by: Mark Clark <[email protected]> (github: mrguessed)
  • Loading branch information
mrguessed committed Oct 2, 2016
1 parent 9a8d3ef commit 775c7c3
Showing 1 changed file with 72 additions and 13 deletions.
85 changes: 72 additions & 13 deletions bin/btmon.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@
<base_topic> /ch <channel-n> _w -> <power-value>
<base_topic> /ch <channel-n> _wh -> <energy-value>
<base_topic> /ch <channel-n> _dwh -> <delta-energy-value>
<base_topic> /ch <channel-n> _a -> <current-value>
for each temperature channel:
<base_topic> /t <temperature-n> -> <temperature-value>
Expand All @@ -678,8 +679,8 @@
{"topic":"/house/energy/399999_p1", "payload":3141592},
{"topic":"/house/energy/399999_t1", "payload":79.500}]
Temperature, Pulse and Voltage values are emitted only if supported by the
underlying device.
Current, Temperature, Pulse and Voltage values are emitted only if supported
by the underlying device.
By configuring the mqtt_map parameter, you can set this up to provide output
like this:
Expand All @@ -700,11 +701,34 @@
mqtt_port=1883
mqtt_clientid=btmon
mqtt_base_topic=/house/energy
mqtt_value_format={metric.value:.3f}
mqtt_user=energy
mqtt_passwd=please_set_me
mqtt_map=399999_ch1_w,solar,399999_ch1_wh,solar_wh,399999_t1,garage
mqtt_upload_period=60
The format of the MQTT Payload can be altered to support different content.
This can be formatted using standard Python str.format() style format strings.
The following tokens are understood within the mqtt_format_value:
* metric.channel (str) the name of the channel
* metric.serial (str) the serial# of the device
* metric.serial_alt (str) the obfuscated serial#
* metric.label (str) the label to uniquely identify the device/channel
* metric.label_alt (str) the obfuscated label
* metric.name = (str) the mapped name of the metric
* metric.value = (float) payload value
* metric.timestamp = (float) epoch-based payload timestamp
* metric.timestamp_alt = (datetime) object version of payload timestamp
Value payload (default):
mqtt_value_format={metric.value:.3f}
Grafana payload:
mqtt_value_format={metric.value:.3f} {metric.timestamp}
JSON payload:
mqtt_value_format={{"value":{metric.value:.3f}, "timestamp":{metric.timestamp}}}
Upgrading:
Expand Down Expand Up @@ -1201,11 +1225,15 @@
MQTT_RETAIN = False
MQTT_MAP = ''
MQTT_UPLOAD_PERIOD = MINUTE
MQTT_VALUE_FORMAT = "{metric.value:.3f}"
MQTT_VALUE_ESCAPE_RAW = 'raw'
MQTT_VALUE_ESCAPE_JSON = 'json'


import base64
import bisect
import calendar
import datetime
import errno
import optparse
import socket
Expand Down Expand Up @@ -4058,7 +4086,8 @@ def _handle_urlopen_error(self, e, url, payload):

class MQTTProcessor(BaseProcessor):
def __init__(self, host, port, clientid, base_topic, qos, retain,
will, user, passwd, tls, map, period):
value_format, value_escape, will, user, passwd, tls,
map, period):
if not publish:
print 'MQTT Error: paho.mqtt.publish module could not be imported.'
sys.exit(1)
Expand All @@ -4070,6 +4099,8 @@ def __init__(self, host, port, clientid, base_topic, qos, retain,
self.base_topic = base_topic
self.qos = int(qos)
self.retain = retain
self.value_format = value_format
self.value_escape = value_escape
self.will = will
self.user = user
self.passwd = passwd
Expand All @@ -4084,6 +4115,8 @@ def __init__(self, host, port, clientid, base_topic, qos, retain,
infmsg('MQTT: topic: %s' % self.base_topic)
infmsg('MQTT: qos: %d' % self.qos)
infmsg('MQTT: retain: %s' % self.retain)
infmsg('MQTT: value_format: %s' % self.value_format)
infmsg('MQTT: value_escape: %s' % self.value_escape)
infmsg('MQTT: will: %s' % (self.will or '<not-specified>'))
infmsg('MQTT: upload period: %d' % self.process_period)
infmsg('MQTT: map: %s' % self.map_str)
Expand All @@ -4095,6 +4128,13 @@ def setup(self):
if self.qos not in (0, 1, 2):
print 'MQTT Error: qos values are 0, 1 or 2'
sys.exit(1)
if not self.value_format:
print 'MQTT Error: mqtt-value-format empty'
sys.exit(1)
if self.value_escape not in (MQTT_VALUE_ESCAPE_RAW, MQTT_VALUE_ESCAPE_JSON):
print 'MQTT Error: mqtt-value-escape must be %s or %s' % (
MQTT_VALUE_ESCAPE_RAW, MQTT_VALUE_ESCAPE_JSON)
sys.exit(1)

self.map = pairs2dict(self.map_str)
if (self.user == None):
Expand All @@ -4117,21 +4157,36 @@ def setup(self):
sys.exit(1)

def _add_msg(self, packet, channel, payload):
if not payload:
return
key = mklabel(packet['serial'], channel)
if key in self.map:
key = self.map[key]
self._msgs.append({'topic': '%s/%s' % (self.base_topic, key),
'payload': round(payload, 3),
'qos': self.qos,
'retain': self.retain})
if payload == None:
return
metric = lambda: None
metric.channel = channel
metric.serial = packet['serial']
metric.serial_alt = obfuscate_serial(metric.serial)
metric.label = mklabel(metric.serial, channel)
metric.label_alt = mklabel(metric.serial_alt, channel)
metric.name = self.map.get(metric.label) or metric.label_alt
metric.value = payload
metric.timestamp = packet['time_created']
metric.timestamp_alt = datetime.datetime.fromtimestamp(metric.timestamp)
name = metric.name
if self.value_escape == MQTT_VALUE_ESCAPE_JSON:
for key, val in metric.__dict__.iteritems():
if isinstance(val, str):
setattr(metric, key, json.dumps(val))
self._msgs.append({'topic': '%s/%s' % (self.base_topic, name),
'payload': self.value_format.format(metric = metric),
'qos': self.qos,
'retain': self.retain})

def process_calculated(self, packets):
self._msgs = []
filters = [FILTER_POWER, FILTER_ENERGY, FILTER_PULSE, FILTER_SENSOR]
if INCLUDE_CURRENT:
filters.append(FILTER_CURRENT)
for p in packets:
self._add_msg(p, 'volts', p['volts'])
for f in [FILTER_POWER, FILTER_ENERGY, FILTER_PULSE, FILTER_SENSOR]:
for f in filters:
for c in PACKET_FORMAT.channels(f):
self._add_msg(p, c, p[c])
# Delta Wh
Expand Down Expand Up @@ -4363,6 +4418,8 @@ def process_calculated(self, packets):
group.add_option('--mqtt-base-topic', help='base topic', metavar='TOPIC')
group.add_option('--mqtt-qos', type='int', help='quality of service', metavar='QOS')
group.add_option('--mqtt-retain', action='store_true', help='retain msg as last good value', metavar='RETAIN')
group.add_option('--mqtt-value-format', help='format to apply to value', metavar='VALUE_FORMAT')
group.add_option('--mqtt-value-escape', help='encode the value (raw, json)', metavar='VALUE_ESCAPE')
group.add_option('--mqtt-will', help='mqtt will for the client', metavar='{"topic": "<topic>", "payload":"<payload>", "qos":<qos>, "retain":<retain>}')
group.add_option('--mqtt-user', help='user', metavar='USERNAME')
group.add_option('--mqtt-passwd', help='password', metavar='PASSWORD')
Expand Down Expand Up @@ -4707,6 +4764,8 @@ def process_calculated(self, packets):
options.mqtt_base_topic or MQTT_BASE_TOPIC,
options.mqtt_qos or MQTT_QOS,
options.mqtt_retain or MQTT_RETAIN,
options.mqtt_value_format or MQTT_VALUE_FORMAT,
options.mqtt_value_escape or MQTT_VALUE_ESCAPE_RAW,
options.mqtt_will,
options.mqtt_user,
options.mqtt_passwd,
Expand Down

0 comments on commit 775c7c3

Please sign in to comment.