Skip to content

Commit

Permalink
Merge branch 'master' into BulbFix
Browse files Browse the repository at this point in the history
  • Loading branch information
uzlonewolf authored Aug 5, 2024
2 parents 17fd66a + 7d8922e commit 447ea78
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 22 deletions.
33 changes: 14 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ TinyTuya can also connect to the Tuya Cloud to poll status and issue commands to
# Example Usage of TinyTuya
import tinytuya

d = tinytuya.OutletDevice('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', 'LOCAL_KEY_HERE')
d.set_version(3.3)
d = tinytuya.Device('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', 'LOCAL_KEY_HERE', version=3.3)
data = d.status()
print('Device status: %r' % data)
```
Expand Down Expand Up @@ -276,8 +275,7 @@ import tinytuya
"""
OUTLET Device
"""
d = tinytuya.OutletDevice('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', 'LOCAL_KEY_HERE')
d.set_version(3.3)
d = tinytuya.Device('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', 'LOCAL_KEY_HERE', version=3.3)
data = d.status()
# Show status and state of first controlled switch on device
Expand Down Expand Up @@ -342,34 +340,31 @@ You can set up a persistent connection to a device and then monitor the state ch
```python
import tinytuya
d = tinytuya.OutletDevice('DEVICEID', 'DEVICEIP', 'DEVICEKEY')
d.set_version(3.3)
d.set_socketPersistent(True)
d = tinytuya.OutletDevice('DEVICEID', 'DEVICEIP', 'DEVICEKEY', version=3.3, persist=True)
print(" > Send Request for Status < ")
payload = d.generate_payload(tinytuya.DP_QUERY)
d.send(payload)
d.status(nowait=True)
print(" > Begin Monitor Loop <")
while(True):
# See if any data is available
data = d.receive()
print('Received Payload: %r' % data)
# Send keyalive heartbeat
print(" > Send Heartbeat Ping < ")
payload = d.generate_payload(tinytuya.HEART_BEAT)
d.send(payload)
# Send keep-alive heartbeat
if not data:
print(" > Send Heartbeat Ping < ")
d.heartbeat()
# NOTE If you are not seeing updates, you can force them - uncomment:
# print(" > Send Request for Status < ")
# payload = d.generate_payload(tinytuya.DP_QUERY)
# d.send(payload)
# d.status(nowait=True)
# NOTE Some smart plugs require an UPDATEDPS command to update power data
# print(" > Send DPS Update Request < ")
# payload = d.generate_payload(tinytuya.UPDATEDPS)
# d.send(payload)
```
### Tuya Cloud Access
Expand Down Expand Up @@ -565,9 +560,9 @@ In addition to the built-in `OutletDevice`, `BulbDevice` and `CoverDevice` devic
```python
# Example usage of community contributed device modules
from tinytuya import Contrib
from tinytuya.Contrib import ThermostatDevice
thermo = Contrib.ThermostatDevice( 'abcdefghijklmnop123456', '172.28.321.475', '1234567890123abc' )
thermo = ThermostatDevice( 'abcdefghijklmnop123456', '172.28.321.475', '1234567890123abc' )
```
## Tuya Data Points - DPS Table
Expand Down Expand Up @@ -869,9 +864,9 @@ NOTE (*) - Depending on the firmware, either 18/19/20/26/27 or 108/109/110/111/x
A user contributed module is available for this device in the [Contrib library](https://github.com/jasonacox/tinytuya/tree/master/tinytuya/Contrib):
```python
from tinytuya import Contrib
from tinytuya.Contrib import ThermostatDevice
thermo = Contrib.ThermostatDevice( 'abcdefghijklmnop123456', '172.28.321.475', '1234567890123abc' )
thermo = ThermostatDevice( 'abcdefghijklmnop123456', '172.28.321.475', '1234567890123abc' )
```
For info on the Sensor Data lists, see https://github.com/jasonacox/tinytuya/discussions/139
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
cryptography>=3.1 # Encryption - AES can also be provided via PyCryptodome or pyaes or pyca/cryptography
requests # Used for Setup Wizard - Tuya IoT Platform calls
colorama # Makes ANSI escape character sequences work under MS Windows.
netifaces # Used to get the IP address of the local machine for scanning for devices.
#netifaces # Used to get the IP address of the local machine for scanning for devices, mainly useful for multi-interface machines.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
INSTALL_REQUIRES = [
'requests', # Used for Setup Wizard - Tuya IoT Platform calls
'colorama', # Makes ANSI escape character sequences work under MS Windows.
'netifaces', # Used for device discovery
#'netifaces', # Used for device discovery, mainly required on multi-interface machines
]

CHOOSE_CRYPTO_LIB = [
Expand Down
143 changes: 143 additions & 0 deletions tinytuya/Contrib/BlanketDevice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# TinyTuya Outlet Device
# -*- coding: utf-8 -*-
"""
Python module to interface with Tuya Electric Heating Blanket
Author: Leo Denham (https://github.com/leodenham)
Tested: Goldair Platinum Electric Blanket GPFAEB-Q
Local Control Classes
BlanketDevice(...)
See OutletDevice() for constructor arguments
Functions
BlanketDevice:
get_feet_level()
get_body_level()
set_feet_level()
set_body_level()
get_feet_time()
get_body_time()
set_feet_time()
set_body_time()
get_feet_countdown()
get_body_countdown()
Inherited
json = status() # returns json payload
set_version(version) # 3.1 [default] or 3.3
set_socketPersistent(False/True) # False [default] or True
set_socketNODELAY(False/True) # False or True [default]
set_socketRetryLimit(integer) # retry count limit [default 5]
set_socketTimeout(timeout) # set connection timeout in seconds [default 5]
set_dpsUsed(dps_to_request) # add data points (DPS) to request
add_dps_to_request(index) # add data point (DPS) index set to None
set_retry(retry=True) # retry if response payload is truncated
set_status(on, switch=1, nowait) # Set status of switch to 'on' or 'off' (bool)
set_value(index, value, nowait) # Set int value of any index.
heartbeat(nowait) # Send heartbeat to device
updatedps(index=[1], nowait) # Send updatedps command to device
turn_on(switch=1, nowait) # Turn on device / switch #
turn_off(switch=1, nowait) # Turn off
set_timer(num_secs, nowait) # Set timer for num_secs
set_debug(toggle, color) # Activate verbose debugging output
set_sendWait(num_secs) # Time to wait after sending commands before pulling response
detect_available_dps() # Return list of DPS available from device
generate_payload(command, data) # Generate TuyaMessage payload for command with data
send(payload) # Send payload to device (do not wait for response)
receive()
"""

from ..core import Device, error_json, ERR_RANGE


class BlanketDevice(Device):
"""
Represents a Tuya based Electric Blanket Device
"""
DPS = 'dps'
DPS_BODY_LEVEL = '14'
DPS_FEET_LEVEL = '15'
DPS_BODY_TIME = '16'
DPS_FEET_TIME = '17'
DPS_BODY_COUNTDOWN = '18'
DPS_FEET_COUNTDOWN = '19'
LEVEL_PREFIX = 'level_'

def _number_to_level(self, num):
return f'{self.LEVEL_PREFIX}{num+1}'

def _level_to_number(self, level):
return int(level.split(self.LEVEL_PREFIX)[1]) - 1

def get_feet_level(self, status_data=None):
if status_data is None:
status_data = self.status()

current = self._level_to_number(status_data[self.DPS][self.DPS_FEET_LEVEL])
return current

def get_body_level(self, status_data=None):
if status_data is None:
status_data = self.status()

current = self._level_to_number(status_data[self.DPS][self.DPS_BODY_LEVEL])
return current

def set_feet_level(self, num):
if (num < 0 or num > 6):
return error_json(
ERR_RANGE, "set_feet_level: The value for the level needs to be between 0 and 6."
)
return self.set_value(self.DPS_FEET_LEVEL, self._number_to_level(num))

def set_body_level(self, num):
if (num < 0 or num > 6):
return error_json(
ERR_RANGE, "set_body_level: The value for the level needs to be between 0 and 6."
)
return self.set_value(self.DPS_BODY_LEVEL, self._number_to_level(num))

def get_feet_time(self, status_data=None):
if status_data is None:
status_data = self.status()

current = status_data[self.DPS][self.DPS_FEET_TIME]
return current.replace('h', '')

def get_body_time(self, status_data=None):
if status_data is None:
status_data = self.status()

current = status_data[self.DPS][self.DPS_BODY_TIME]
return current.replace('h', '')

def set_feet_time(self, num):
if (num < 1 or num > 12):
return error_json(
ERR_RANGE, "set_feet_time: The value for the time needs to be between 1 and 12."
)
return self.set_value(self.DPS_FEET_TIME, f"{num}h")

def set_body_time(self, num):
if (num < 1 or num > 12):
return error_json(
ERR_RANGE, "set_body_time: The value for the time needs to be between 1 and 12."
)
return self.set_value(self.DPS_BODY_TIME, f"{num}h")

def get_feet_countdown(self, status_data=None):
if status_data is None:
status_data = self.status()

current = status_data[self.DPS][self.DPS_FEET_COUNTDOWN]
return current

def get_body_countdown(self, status_data=None):
if status_data is None:
status_data = self.status()

current = status_data[self.DPS][self.DPS_BODY_COUNTDOWN]
return current

21 changes: 21 additions & 0 deletions tinytuya/Contrib/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,27 @@ In addition to the built-in `OutletDevice`, `BulbDevice` and `CoverDevice` devic
time.sleep(20)
```

### BlanketDevice

* BlanketDevice - A community-contributed Python module to add support for Tuya WiFi smart electric blankets
* Author: [Leo Denham](https://github.com/leodenham)
* Tested: [Goldair Platinum Electric Blanket GPFAEB-Q](https://www.target.com.au/p/goldair-platinum-electric-blanket-gpfaeb-q/8300270020_white)

```python
from tinytuya.Contrib import BlanketDevice
import time

device = BlanketDevice.BlanketDevice(dev_id="XXXX", address="Y.Y.Y.Y", local_key="ZZZZ", version=3.3)

device.turn_on()

# Heat up for 20 minutes then maintain nice temperature overnight.
device.set_body_level(6)
time.sleep(60*20)
device.set_body_level(2)
device.set_body_time(12)
```

## Submit Your Device

* We welcome new device modules!
Expand Down
6 changes: 5 additions & 1 deletion tinytuya/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,11 @@ def send_discovery_request( iface_list=None ):
if 'socket' not in iface:
iface['socket'] = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
iface['socket'].setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
iface['socket'].bind( (address,0) )
try:
iface['socket'].bind( (address,0) )
except:
log.debug( 'Failed to bind to address %r for discovery broadcasts, skipping interface!', address, exc_info=True )
continue

if 'payload' not in iface:
bcast = json.dumps( {"from":"app","ip":address} ).encode()
Expand Down

0 comments on commit 447ea78

Please sign in to comment.