Skip to content

Commit

Permalink
Merge pull request #106 from PiBrewing/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
avollkopf authored Mar 30, 2023
2 parents 638b95d + 35252a2 commit da72d61
Show file tree
Hide file tree
Showing 16 changed files with 263 additions and 360 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ build
dist
.idea
*.log
cbpi.egg-info
cbpi4.egg-info
*egg-info/
log
venv
cbpi/extension/ui
Expand Down
2 changes: 1 addition & 1 deletion cbpi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = "4.1.6"
__version__ = "4.1.7"
__codename__ = "Groundhog Day"

25 changes: 13 additions & 12 deletions cbpi/controller/log_file_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pandas as pd
import zipfile
import base64
import urllib3
from urllib3 import Timeout, PoolManager
from pathlib import Path
from cbpi.api import *
from cbpi.api.config import ConfigType
Expand Down Expand Up @@ -46,43 +46,44 @@ def log_data(self, name: str, value: str) -> None:

formatted_time = strftime("%Y-%m-%d %H:%M:%S", localtime())
self.datalogger[name].info("%s,%s" % (formatted_time, str(value)))

if self.influxdb == "Yes":
## Write to influxdb in an asyncio task
self._task = asyncio.create_task(self.log_influx(name,value))

async def log_influx(self, name:str, value:str):
self.influxdbcloud = self.cbpi.config.get("INFLUXDBCLOUD", "No")
self.influxdbaddr = self.cbpi.config.get("INFLUXDBADDR", None)
self.influxdbport = self.cbpi.config.get("INFLUXDBPORT", None)
self.influxdbname = self.cbpi.config.get("INFLUXDBNAME", None)
self.influxdbuser = self.cbpi.config.get("INFLUXDBUSER", None)
self.influxdbpwd = self.cbpi.config.get("INFLUXDBPWD", None)
self.influxdbmeasurement = self.cbpi.config.get("INFLUXDBMEASUREMENT", "measurement")

id = name
timeout = Timeout(connect=5.0, read=None)
try:
chars = {'ö':'oe','ä':'ae','ü':'ue','Ö':'Oe','Ä':'Ae','Ü':'Ue'}
sensor=self.cbpi.sensor.find_by_id(name)
if sensor is not None:
itemname=sensor.name.replace(" ", "_")
for char in chars:
itemname = itemname.replace(char,chars[char])
out=str(self.influxdbmeasurement)+",source=" + itemname + ",itemID=" + str(id) + " value="+str(value)
except Exception as e:
logging.error("InfluxDB ID Error: {}".format(e))

if self.influxdbcloud == "Yes":
self.influxdburl="https://" + self.influxdbaddr + "/api/v2/write?org=" + self.influxdbuser + "&bucket=" + self.influxdbname + "&precision=s"
self.influxdburl=self.influxdbaddr + "/api/v2/write?org=" + self.influxdbuser + "&bucket=" + self.influxdbname + "&precision=s"
try:
header = {'User-Agent': name, 'Authorization': "Token {}".format(self.influxdbpwd)}
http = urllib3.PoolManager()
req = http.request('POST',self.influxdburl, body=out, headers = header)
http = PoolManager(timeout=timeout)
req = http.request('POST',self.influxdburl, body=out.encode(), headers = header)
except Exception as e:
logging.error("InfluxDB cloud write Error: {}".format(e))

else:
self.base64string = base64.b64encode(('%s:%s' % (self.influxdbuser,self.influxdbpwd)).encode())
self.influxdburl='http://' + self.influxdbaddr + ':' + str(self.influxdbport) + '/write?db=' + self.influxdbname
self.influxdburl= self.influxdbaddr + '/write?db=' + self.influxdbname
try:
header = {'User-Agent': name, 'Content-Type': 'application/x-www-form-urlencoded','Authorization': 'Basic %s' % self.base64string.decode('utf-8')}
http = urllib3.PoolManager()
req = http.request('POST',self.influxdburl, body=out, headers = header)
http = PoolManager(timeout=timeout)
req = http.request('POST',self.influxdburl, body=out.encode(), headers = header)
except Exception as e:
logging.error("InfluxDB write Error: {}".format(e))

Expand Down
14 changes: 11 additions & 3 deletions cbpi/controller/satellite_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ async def listen(self):
method = topic_filter[1]
if message.topic.matches(topic):
await (method(message))
except asyncio.CancelledError:
# Cancel
self.logger.warning("MQTT Listening Cancelled")
#break
except MqttError as e:
self.logger.error("MQTT Exception: {}".format(e))
except Exception as e:
Expand Down Expand Up @@ -153,6 +157,7 @@ def subcribe(self, topic, method):
return task

async def _subcribe(self, topic, method):
self.error=False
while True:
try:
if self.client._connected.done():
Expand All @@ -163,11 +168,14 @@ async def _subcribe(self, topic, method):
await method(message.payload.decode())
except asyncio.CancelledError:
# Cancel
self.logger.warning("Sub Cancelled")
self.logger.warning("Subscription {} Cancelled".format(topic))
self.error=True
except MqttError as e:
self.logger.error("Sub MQTT Exception: {}".format(e))
except Exception as e:
self.logger.error("Sub Exception: {}".format(e))

# wait before try to resubscribe
await asyncio.sleep(5)
if self.error == True:
break
else:
await asyncio.sleep(5)
23 changes: 13 additions & 10 deletions cbpi/extension/ConfigUpdate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def run(self):
logfiles = self.cbpi.config.get("CSVLOGFILES", None)
influxdb = self.cbpi.config.get("INFLUXDB", None)
influxdbaddr = self.cbpi.config.get("INFLUXDBADDR", None)
influxdbport = self.cbpi.config.get("INFLUXDBPORT", None)
#influxdbport = self.cbpi.config.get("INFLUXDBPORT", None)
influxdbname = self.cbpi.config.get("INFLUXDBNAME", None)
influxdbuser = self.cbpi.config.get("INFLUXDBUSER", None)
influxdbpwd = self.cbpi.config.get("INFLUXDBPWD", None)
Expand All @@ -52,6 +52,7 @@ async def run(self):
SENSOR_LOG_MAX_BYTES = self.cbpi.config.get("SENSOR_LOG_MAX_BYTES", None)
slow_pipe_animation = self.cbpi.config.get("slow_pipe_animation", None)
NOTIFY_ON_ERROR = self.cbpi.config.get("NOTIFY_ON_ERROR", None)
PLAY_BUZZER = self.cbpi.config.get("PLAY_BUZZER", None)
BoilAutoTimer = self.cbpi.config.get("BoilAutoTimer", None)


Expand Down Expand Up @@ -224,15 +225,7 @@ async def run(self):
if influxdbaddr is None:
logger.info("INIT Influxdbaddr")
try:
await self.cbpi.config.add("INFLUXDBADDR", "localhost", ConfigType.STRING, "IP Address of your influxdb server (If INFLUXDBCLOUD set to Yes use URL Address of your influxdb cloud server)")
except:
logger.warning('Unable to update config')

## Check if influxdbport is in config
if influxdbport is None:
logger.info("INIT Influxdbport")
try:
await self.cbpi.config.add("INFLUXDBPORT", "8086", ConfigType.STRING, "Port of your influxdb server")
await self.cbpi.config.add("INFLUXDBADDR", "http://localhost:8086", ConfigType.STRING, "URL Address of your influxdb server (If INFLUXDBCLOUD set to Yes use URL Address of your influxdb cloud server)")
except:
logger.warning('Unable to update config')

Expand Down Expand Up @@ -336,6 +329,16 @@ async def run(self):
except:
logger.warning('Unable to update config')

## Check if PLAY_BUZZER is in config
if PLAY_BUZZER is None:
logger.info("INIT PLAY_BUZZER")
try:
await self.cbpi.config.add("PLAY_BUZZER", "No", ConfigType.SELECT, "Play buzzer sound in Web interface on Notifications",
[{"label": "Yes", "value": "Yes"},
{"label": "No", "value": "No"}])
except:
logger.warning('Unable to update config')

if BoilAutoTimer is None:
logging.info("INIT BoilAutoTimer")
try:
Expand Down
68 changes: 65 additions & 3 deletions cbpi/extension/httpsensor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
cache = {}

@parameters([Property.Text(label="Key", configurable=True, description="Http Key"),
Property.Number(label="Timeout", configurable="True",unit="sec",description="Timeout in seconds to send notification (default:60 | deactivated: 0)")
])
Property.Number(label="Timeout", configurable="True",unit="sec",description="Timeout in seconds to send notification (default:60 | deactivated: 0)"),
Property.Kettle(label="Kettle", description="Reduced logging if Kettle is inactive (only Kettle or Fermenter to be selected)"),
Property.Fermenter(label="Fermenter", description="Reduced logging in seconds if Fermenter is inactive (only Kettle or Fermenter to be selected)"),
Property.Number(label="ReducedLogging", configurable=True, description="Reduced logging frequency in seconds if selected Kettle or Fermenter is inactive (default: 60 sec | disabled: 0)")])

class HTTPSensor(CBPiSensor):
def __init__(self, cbpi, id, props):
super(HTTPSensor, self).__init__(cbpi, id, props)
Expand All @@ -25,6 +28,19 @@ def __init__(self, cbpi, id, props):
self.sensor=self.get_sensor(self.id)
self.lastdata=time.time()

self.lastlog=0
self.reducedfrequency=int(self.props.get("ReducedLogging", 60))
if self.reducedfrequency < 0:
self.reducedfrequency = 0

self.kettleid=self.props.get("Kettle", None)
self.fermenterid=self.props.get("Fermenter", None)
self.reducedlogging = True if self.kettleid or self.fermenterid else False

if self.kettleid is not None and self.fermenterid is not None:
self.reducedlogging=False
self.cbpi.notify("HTTPSensor", "Sensor '" + str(self.sensor.name) + "' cant't have Fermenter and Kettle defined for reduced logging.", NotificationType.WARNING, action=[NotificationAction("OK", self.Confirm)])

async def Confirm(self, **kwargs):
self.nextchecktime = time.time() + self.timeout
self.notificationsend = False
Expand All @@ -41,6 +57,8 @@ async def run(self):
In this example the code is executed every second
'''
while self.running is True:
self.kettle = self.get_kettle(self.kettleid) if self.kettleid is not None else None
self.fermenter = self.get_fermenter(self.fermenterid) if self.fermenterid is not None else None
if self.timeout !=0:
currenttime=time.time()
if currenttime > self.nextchecktime and self.notificationsend == False:
Expand All @@ -51,6 +69,13 @@ async def run(self):
if cache_value is not None:
self.value = float(cache_value)
self.push_update(self.value)

if self.reducedlogging:
await self.logvalue()
else:
self.log_data(self.value)
self.lastlog = time.time()

if self.timeout !=0:
self.nextchecktime = currenttime + self.timeout
self.notificationsend = False
Expand All @@ -60,13 +85,50 @@ async def run(self):
pass
await asyncio.sleep(1)

async def logvalue(self):
now=time.time()
if self.kettle is not None:
try:
kettlestatus=self.kettle.instance.state
except:
kettlestatus=False
if kettlestatus:
self.log_data(self.value)
logging.info("Kettle Active")
self.lastlog = time.time()
else:
logging.info("Kettle Inactive")
if self.reducedfrequency != 0:
if now >= self.lastlog + self.reducedfrequency:
self.log_data(self.value)
self.lastlog = time.time()
logging.info("Logged with reduced freqency")
pass

if self.fermenter is not None:
try:
fermenterstatus=self.fermenter.instance.state
except:
fermenterstatus=False
if fermenterstatus:
self.log_data(self.value)
logging.info("Fermenter Active")
self.lastlog = time.time()
else:
logging.info("Fermenter Inactive")
if self.reducedfrequency != 0:
if now >= self.lastlog + self.reducedfrequency:
self.log_data(self.value)
self.lastlog = time.time()
logging.info("Logged with reduced freqency")
pass

def get_state(self):
# return the current state of the sensor
return dict(value=self.value)

class HTTPSensorEndpoint(CBPiExtension):


def __init__(self, cbpi):
'''
Initializer
Expand Down
84 changes: 74 additions & 10 deletions cbpi/extension/mqtt_sensor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
@parameters([Property.Text(label="Topic", configurable=True, description="MQTT Topic"),
Property.Text(label="PayloadDictionary", configurable=True, default_value="",
description="Where to find msg in payload, leave blank for raw payload"),
Property.Number(label="Timeout", configurable="True",unit="sec",
Property.Kettle(label="Kettle", description="Reduced logging if Kettle is inactive (only Kettle or Fermenter to be selected)"),
Property.Fermenter(label="Fermenter", description="Reduced logging in seconds if Fermenter is inactive (only Kettle or Fermenter to be selected)"),
Property.Number(label="ReducedLogging", configurable=True, description="Reduced logging frequency in seconds if selected Kettle or Fermenter is inactive (default:60 sec | 0 disabled)"),
Property.Number(label="Timeout", configurable=True, unit="sec",
description="Timeout in seconds to send notification (default:60 | deactivated: 0)")])
class MQTTSensor(CBPiSensor):

Expand All @@ -28,8 +31,19 @@ def __init__(self, cbpi, id, props):
self.notificationsend = False
self.nextchecktime=self.starttime+self.timeout
self.lastdata=time.time()
self.lastlog=0
self.sensor=self.get_sensor(self.id)
self.reducedfrequency=int(self.props.get("ReducedLogging", 60))
if self.reducedfrequency < 0:
self.reducedfrequency = 0
self.kettleid=self.props.get("Kettle", None)
self.fermenterid=self.props.get("Fermenter", None)
self.reducedlogging = True if self.kettleid or self.fermenterid else False

if self.kettleid is not None and self.fermenterid is not None:
self.reducedlogging=False
self.cbpi.notify("MQTTSensor", "Sensor '" + str(self.sensor.name) + "' cant't have Fermenter and Kettle defined for reduced logging.", NotificationType.WARNING, action=[NotificationAction("OK", self.Confirm)])

async def Confirm(self, **kwargs):
self.nextchecktime = time.time() + self.timeout
self.notificationsend = False
Expand All @@ -49,14 +63,60 @@ async def on_message(self, message):

if isinstance(val, (int, float, str)):
self.value = float(val)
self.log_data(self.value)
self.push_update(self.value)
if self.reducedlogging == True:
await self.logvalue()
else:
logging.info("MQTTSensor {} regular logging".format(self.sensor.name))
self.log_data(self.value)
self.lastlog = time.time()

if self.timeout !=0:
self.nextchecktime = time.time() + self.timeout
self.notificationsend = False
self.lastdata=time.time()
except Exception as e:
logging.info("MQTT Sensor Error {}".format(e))
logging.error("MQTT Sensor Error {}".format(e))

async def logvalue(self):
self.kettle = self.get_kettle(self.kettleid) if self.kettleid is not None else None
self.fermenter = self.get_fermenter(self.fermenterid) if self.fermenterid is not None else None
now=time.time()
if self.kettle is not None:
try:
kettlestatus=self.kettle.instance.state
except:
kettlestatus=False
if kettlestatus:
self.log_data(self.value)
logging.info("MQTTSensor {} Kettle Active".format(self.sensor.name))
self.lastlog = time.time()
else:
logging.info("MQTTSensor {} Kettle Inactive".format(self.sensor.name))
if self.reducedfrequency != 0:
if now >= self.lastlog + self.reducedfrequency:
self.log_data(self.value)
self.lastlog = time.time()
logging.info("Logged with reduced freqency")
pass

if self.fermenter is not None:
try:
fermenterstatus=self.fermenter.instance.state
except:
fermenterstatus=False
if fermenterstatus:
self.log_data(self.value)
logging.info("MQTTSensor {} Fermenter Active".format(self.sensor.name))
self.lastlog = time.time()
else:
logging.info("MQTTSensor {} Fermenter Inactive".format(self.sensor.name))
if self.reducedfrequency != 0:
if now >= self.lastlog + self.reducedfrequency:
self.log_data(self.value)
self.lastlog = time.time()
logging.info("Logged with reduced freqency")
pass

async def run(self):
while self.running:
Expand All @@ -70,13 +130,17 @@ def get_state(self):
return dict(value=self.value)

async def on_stop(self):
if self.mqtt_task.done() is False:
self.mqtt_task.cancel()
try:
await self.mqtt_task
except asyncio.CancelledError:
pass

was_cancelled=False
if not self.mqtt_task.done():
logging.info("Task not done -> cancelling")
was_cancelled = self.mqtt_task.cancel()
try:
logging.info("Trying to call cancelled task")
await self.mqtt_task
except asyncio.CancelledError:
logging.info("Task has been Cancelled")
pass
logging.info("Task cancelled: {}".format(was_cancelled))

def setup(cbpi):
'''
Expand Down
Loading

0 comments on commit da72d61

Please sign in to comment.