Skip to content

Commit

Permalink
Rework BulbDevice to use set_value()/set_multiple_values() instead of…
Browse files Browse the repository at this point in the history
… building the payload directly
  • Loading branch information
uzlonewolf committed Jul 14, 2024
1 parent adad4cd commit 1f07086
Showing 1 changed file with 50 additions and 100 deletions.
150 changes: 50 additions & 100 deletions tinytuya/BulbDevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,7 @@
result = state():
Inherited
json = status() # returns json payload
set_version(version) # 3.1 [default] or 3.3
set_socketPersistent(False/True) # False [default] or True
set_socketNODELAY(False/True) # False or True [default]
set_socketRetryLimit(integer) # retry count limit [default 5]
set_socketTimeout(timeout) # set connection timeout in seconds [default 5]
set_dpsUsed(dps_to_request) # add data points (DPS) to request
add_dps_to_request(index) # add data point (DPS) index set to None
set_retry(retry=True) # retry if response payload is truncated
set_status(on, switch=1, nowait) # Set status of switch to 'on' or 'off' (bool)
set_value(index, value, nowait) # Set int value of any index.
heartbeat(nowait) # Send heartbeat to device
updatedps(index=[1], nowait) # Send updatedps command to device
turn_on(switch=1, nowait) # Turn on device / switch #
turn_off(switch=1, nowait) # Turn off
set_timer(num_secs, nowait) # Set timer for num_secs
set_debug(toggle, color) # Activate verbose debugging output
set_sendWait(num_secs) # Time to wait after sending commands before pulling response
detect_available_dps() # Return list of DPS available from device
generate_payload(command, data) # Generate TuyaMessage payload for command with data
send(payload) # Send payload to device (do not wait for response)
receive() # Receive payload from device
Every device function from core.py
"""

import colorsys
Expand Down Expand Up @@ -136,36 +115,33 @@ def _rgb_to_hexvalue(r, g, b, bulb="A"):

# Bulb Type A
if bulb == "A":
# h:0-360,s:0-255,v:0-255|hsv|
hexvalue = ""
for value in rgb:
temp = str(hex(int(value))).replace("0x", "")
if len(temp) == 1:
temp = "0" + temp
hexvalue = hexvalue + temp

hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)]
hexvalue_hsv = ""
for value in hsvarray:
temp = str(hex(int(value))).replace("0x", "")
if len(temp) == 1:
temp = "0" + temp
hexvalue_hsv = hexvalue_hsv + temp
if len(hexvalue_hsv) == 7:
hexvalue = hexvalue + "0" + hexvalue_hsv
else:
hexvalue = hexvalue + "00" + hexvalue_hsv

# r:0-255,g:0-255,b:0-255|rgb|
for rgb_value in rgb:
value = int(rgb_value)
if value < 0 or value > 255:
raise ValueError(f"Bulb type {bulb} must have RGB values 0-255.")
hexvalue += '%02x' % value

# h:0-360,s:0-255,v:0-255|hsv|
hexvalue += '%04x' % int(hsv[0] * 360)
hexvalue += '%02x' % int(hsv[1] * 255)
hexvalue += '%02x' % int(hsv[2] * 255)

# Bulb Type B
if bulb == "B":
elif bulb == "B":
# h:0-360,s:0-1000,v:0-1000|hsv|
hexvalue = ""
hsvarray = [int(hsv[0] * 360), int(hsv[1] * 1000), int(hsv[2] * 1000)]
for value in hsvarray:
temp = str(hex(int(value))).replace("0x", "")
while len(temp) < 4:
temp = "0" + temp
hexvalue = hexvalue + temp
for hsv_value in hsvarray:
value = int(hsv_value)
if value < 0 or value > 1000:
raise ValueError(f"Bulb type {bulb} must have RGB values 0-255.")
hexvalue += '%04x' % int(value)
else:
# Unsupported bulb type
raise ValueError(f"Unsupported bulb type {bulb} - unable to determine hexvalue.")

return hexvalue

Expand Down Expand Up @@ -273,11 +249,7 @@ def set_mode(self, mode="white", nowait=False):
mode(string): white,colour,scene,music
nowait(bool): True to send without waiting for response.
"""
payload = self.generate_payload(
CONTROL, {self.DPS_INDEX_MODE[self.bulb_type]: mode}
)
data = self._send_receive(payload, getresponse=(not nowait))
return data
return self.set_value( self.DPS_INDEX_MODE[self.bulb_type], mode, nowait=nowait )

def set_scene(self, scene, nowait=False):
"""
Expand All @@ -301,11 +273,7 @@ def set_scene(self, scene, nowait=False):
else:
s = self.DPS_MODE_SCENE_4

payload = self.generate_payload(
CONTROL, {self.DPS_INDEX_MODE[self.bulb_type]: s}
)
data = self._send_receive(payload, getresponse=(not nowait))
return data
return self.set_value( self.DPS_INDEX_MODE[self.bulb_type], s, nowait=nowait )

def set_colour(self, r, g, b, nowait=False):
"""
Expand Down Expand Up @@ -338,15 +306,12 @@ def set_colour(self, r, g, b, nowait=False):

hexvalue = BulbDevice._rgb_to_hexvalue(r, g, b, self.bulb_type)

payload = self.generate_payload(
CONTROL,
{
self.DPS_INDEX_MODE[self.bulb_type]: self.DPS_MODE_COLOUR,
self.DPS_INDEX_COLOUR[self.bulb_type]: hexvalue,
},
)
data = self._send_receive(payload, getresponse=(not nowait))
return data
payload = {
self.DPS_INDEX_MODE[self.bulb_type]: self.DPS_MODE_COLOUR,
self.DPS_INDEX_COLOUR[self.bulb_type]: hexvalue,
}

return self.set_multiple_values( payload, nowait=nowait )

def set_hsv(self, h, s, v, nowait=False):
"""
Expand Down Expand Up @@ -377,19 +342,8 @@ def set_hsv(self, h, s, v, nowait=False):
)

(r, g, b) = colorsys.hsv_to_rgb(h, s, v)
hexvalue = BulbDevice._rgb_to_hexvalue(
r * 255.0, g * 255.0, b * 255.0, self.bulb_type
)

payload = self.generate_payload(
CONTROL,
{
self.DPS_INDEX_MODE[self.bulb_type]: self.DPS_MODE_COLOUR,
self.DPS_INDEX_COLOUR[self.bulb_type]: hexvalue,
},
)
data = self._send_receive(payload, getresponse=(not nowait))
return data

return self.set_colour( r * 255.0, g * 255.0, b * 255.0, nowait=nowait )

def set_white_percentage(self, brightness=100, colourtemp=0, nowait=False):
"""
Expand Down Expand Up @@ -466,17 +420,13 @@ def set_white(self, brightness=-1, colourtemp=-1, nowait=False):
"set_white: The colour temperature needs to be between 0 and 1000.",
)

payload = self.generate_payload(
CONTROL,
{
self.DPS_INDEX_MODE[self.bulb_type]: self.DPS_MODE_WHITE,
self.DPS_INDEX_BRIGHTNESS[self.bulb_type]: brightness,
self.DPS_INDEX_COLOURTEMP[self.bulb_type]: colourtemp,
},
)
payload = {
self.DPS_INDEX_MODE[self.bulb_type]: self.DPS_MODE_WHITE,
self.DPS_INDEX_BRIGHTNESS[self.bulb_type]: brightness,
self.DPS_INDEX_COLOURTEMP[self.bulb_type]: colourtemp,
}

data = self._send_receive(payload, getresponse=(not nowait))
return data
return self.set_multiple_values( payload, nowait=nowait )

def set_brightness_percentage(self, brightness=100, nowait=False):
"""
Expand Down Expand Up @@ -520,31 +470,35 @@ def set_brightness(self, brightness, nowait=False):
# Determine which mode bulb is in and adjust accordingly
state = self.state()
data = None
msg = 'set_brightness: '

if "mode" in state:
if state["mode"] == "white":
# for white mode use DPS for brightness
if not self.has_brightness:
log.debug("set_brightness: Device does not appear to support brightness.")
# return error_json(ERR_FUNCTION, "set_brightness: Device does not support brightness.")
payload = self.generate_payload(
CONTROL, {self.DPS_INDEX_BRIGHTNESS[self.bulb_type]: brightness}
)
data = self._send_receive(payload, getresponse=(not nowait))

if state["mode"] == "colour":
data = self.set_value( self.DPS_INDEX_BRIGHTNESS[self.bulb_type], brightness, nowait=nowait )
msg += 'No repsonse from bulb.'
elif state["mode"] == "colour":
# for colour mode use hsv to increase brightness
if self.bulb_type == "A":
value = brightness / 255.0
else:
value = brightness / 1000.0
(h, s, v) = self.colour_hsv()
data = self.set_hsv(h, s, value, nowait=nowait)
msg += 'No repsonse from bulb.'
else:
msg += "Device mode is not 'white' or 'colour', cannot set brightness."
else:
msg += 'Unknown bulb state.'

if data is not None or nowait is True:
return data
else:
return error_json(ERR_STATE, "set_brightness: Unknown bulb state.")
log.debug( msg )
return error_json(ERR_STATE, msg)

def set_colourtemp_percentage(self, colourtemp=100, nowait=False):
"""
Expand Down Expand Up @@ -588,11 +542,7 @@ def set_colourtemp(self, colourtemp, nowait=False):
"set_colourtemp: The colour temperature needs to be between 0 and 1000.",
)

payload = self.generate_payload(
CONTROL, {self.DPS_INDEX_COLOURTEMP[self.bulb_type]: colourtemp}
)
data = self._send_receive(payload, getresponse=(not nowait))
return data
return self.set_value( self.DPS_INDEX_COLOURTEMP[self.bulb_type], colourtemp, nowait=nowait )

def brightness(self):
"""Return brightness value"""
Expand Down

0 comments on commit 1f07086

Please sign in to comment.