forked from enesbcs/rpieasy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
_C001_DomoHTTP.py
127 lines (120 loc) · 3.97 KB
/
_C001_DomoHTTP.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python3
#############################################################################
############### Domoticz HTTP/HTTPS Controller for RPIEasy ##################
#############################################################################
#
# Only one way data sending supported for obvious reasons.
#
# Copyright (C) 2018-2019 by Alexander Nagy - https://bitekmindenhol.blog.hu/
#
import controller
import rpieGlobals
from helper_domoticz import *
import misc
import urllib.request
from multiprocessing import Process
import base64
import webserver
class Controller(controller.ControllerProto):
CONTROLLER_ID = 1
CONTROLLER_NAME = "Domoticz HTTP"
def __init__(self,controllerindex):
controller.ControllerProto.__init__(self,controllerindex)
self.usesID = True
self.usesAccount = True
self.usesPassword = True
self.authmode = 0
def webform_load(self):
try:
am = self.authmode
except:
am = 0
options = ["HTTP","HTTPS/auto negotiation","HTTPS/disable verify"]
optionvalues = [0,1,2]
webserver.addFormSelector("Mode","c001_mode",len(optionvalues),options,optionvalues,None,int(am))
return True
def webform_save(self,params):
try:
self.authmode = int(webserver.arg("c001_mode",params))
except:
self.authmode = 0
return True
def senddata(self,idx,sensortype,value,userssi=-1,usebattery=-1,tasknum=-1,changedvalue=-1):
if self.enabled:
if int(idx) != 0:
try:
usebattery = float(usebattery)
except:
usebattery = -1
if int(sensortype)==rpieGlobals.SENSOR_TYPE_SWITCH:
url = "/json.htm?type=command¶m=switchlight&idx="
url += str(idx)
url += "&switchcmd="
if round(float(value[0])) == 0:
url += "Off"
else:
url += "On"
elif int(sensortype)==rpieGlobals.SENSOR_TYPE_DIMMER:
url = "/json.htm?type=command¶m=switchlight&idx="
url += str(idx)
url += "&switchcmd="
if float(value[0]) == 0:
url += "Off"
else:
url += "Set%20Level&level="
url += str(value[0])
else:
url = "/json.htm?type=command¶m=udevice&idx="
url += str(idx)
url += "&nvalue=0&svalue="
url += formatDomoticzSensorType(sensortype,value)
url += "&rssi="
url += mapRSSItoDomoticz(userssi)
if int(usebattery) != -1 and int(usebattery) != 255: # battery input 0..100%, 255 means not supported
url += "&battery="
url += str(int(usebattery))
else:
bval = misc.get_battery_value()
url += "&battery="
url += str(int(bval))
urlstr = self.controllerip+":"+self.controllerport+url+self.getaccountstr()
misc.addLog(rpieGlobals.LOG_LEVEL_DEBUG,urlstr) # sendviahttp
httpproc = Process(target=self.urlget, args=(urlstr,)) # use multiprocess to avoid blocking
httpproc.start()
else:
misc.addLog(rpieGlobals.LOG_LEVEL_ERROR,"MQTT : IDX cannot be zero!")
def urlget(self,url):
try:
am = self.authmode
except:
am = 0
if am==0: # http
url = "http://"+str(url)
elif am==1 or am==2: # https
url = "https://"+str(url)
try:
import ssl
except:
misc.addLog(rpieGlobals.LOG_LEVEL_ERROR,"OpenSSL is not reachable!")
#self.authmode=0
return False
if am==2: # https insecure
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
else:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
try:
if am == 0:
content = urllib.request.urlopen(url,None,2)
else:
content = urllib.request.urlopen(url,None,2, context=ctx)
except Exception as e:
misc.addLog(rpieGlobals.LOG_LEVEL_ERROR,"Controller: "+self.controllerip+" connection failed "+str(e))
def getaccountstr(self):
retstr = ""
if self.controlleruser!="" or self.controllerpassword!="":
acc = base64.b64encode(bytes(self.controlleruser,"utf-8")).decode("utf-8")
pw = base64.b64encode(bytes(self.controllerpassword,"utf-8")).decode("utf-8")
retstr = "&username="+ str(acc) +"&password="+ str(pw)
return retstr