Skip to content

Commit

Permalink
Add error handling and heartbeat keepalive
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonacox committed Oct 20, 2024
1 parent 438d88c commit f15cecc
Showing 1 changed file with 122 additions and 40 deletions.
162 changes: 122 additions & 40 deletions examples/multi-select.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,56 +48,138 @@
]
}

# Create array, devices, that is an array of tinytuya.Device objects
devices = []
for i in config["TuyaDevices"]:
# Settings
TTL_HEARTBEAT = 12 # Time in seconds between heartbeats

def create_device(i):
print(f"Connecting to {i['Device ID']} at {i['Address']} with key {i['Local Key']}")
d = tinytuya.Device(i["Device ID"], i["Address"], i["Local Key"], version=i["Version"])
devices.append(d) # Add the device to the devices array
device = tinytuya.Device(i["Device ID"], i["Address"], i["Local Key"], version=i["Version"])
return device

def getDeviceStatuses():
global devices
global statuses
def reconnect_device(device_info, index, statuses, cool_down_time=5):
"""
Attempts to reconnect the device after a cool-down period and update the statuses.
"""
time.sleep(cool_down_time) # Cool-down before reconnection

# Enable persistent socket connection for each device
for device in devices:
try:
print(f"Reconnecting to {device_info['Device ID']}...")

device = create_device(device_info)
device.set_socketPersistent(True)
# Call status() once to get the initial status and connect
initial_status = device.status()
device_id = device.id
index = devices.index(device)
print(f"INITIAL status from {device_id}: {initial_status}")
statuses[index] = {"id": device_id, "status": initial_status["dps"]}

# Check if we successfully retrieved a valid status
if "dps" in initial_status:
print(f"Reconnected and got status from {device.id}: {initial_status}")
statuses[index] = {"id": device.id, "status": initial_status["dps"]}
else:
raise Exception(f"Failed to get valid initial status after reconnect for {device.id}: {initial_status}")

return device
except Exception as e:
print(f"Failed to reconnect to {device_info['Device ID']}: {e}")
statuses[index] = {"id": device_info["Device ID"], "status": "Disconnected"}
return None

def send_heartbeat(device):
"""
Sends a heartbeat packet to keep the device connected.
"""
try:
# Send a heartbeat packet
device.heartbeat(nowait=True)
print(f"Sent heartbeat to {device.id}")
except Exception as e:
print(f"Failed to send heartbeat to {device.id}: {e}")

def getDeviceStatuses(devices, config):
statuses = [None] * len(devices) # Initialize statuses list to hold results for each device

# Enable persistent socket connection for each device
for index, device in enumerate(devices):
try:
device.set_socketPersistent(True)
initial_status = device.status()
if "dps" in initial_status:
print(f"INITIAL status from {device.id}: {initial_status}")
statuses[index] = {"id": device.id, "status": initial_status["dps"]}
else:
print(f"Failed to get initial status from {device.id}")
statuses[index] = {"id": device.id, "status": {}}
except Exception as e:
print(f"Error getting initial status from {device.id}: {e}")
statuses[index] = {"id": device.id, "status": {}}

# Create a list of sockets to monitor
sockets = [device.socket for device in devices]

last_heartbeat_time = time.time() # Track the last time a heartbeat was sent

# Infinite loop to listen for status updates using select
while True:
# Use select to wait for any of the device sockets to have data
readable, _, _ = select.select(sockets, [], [], 0.1)

# Process each socket with incoming data
for sock in readable:
# Find the corresponding device for this socket
device = next(device for device in devices if device.socket == sock)
updated_status = device.receive()

if updated_status:
print(f"UPDATE status from {device.id}: {updated_status}")
index = devices.index(device)
# We may only get one DPS, so just update that one item
if "dps" in updated_status:
for key in updated_status["dps"]:
statuses[index]["status"][key] = updated_status["dps"][key]
print(f" - Updated status for {device.id} DPS {key} to {updated_status['dps'][key]}")

# Add a small delay (optional) to prevent tight looping
time.sleep(0.1)

# Example usage
statuses = [None] * len(devices) # Initialize statuses list to hold results for each device
# Send a heartbeat every 5 seconds to all devices
if time.time() - last_heartbeat_time >= TTL_HEARTBEAT:
for device in devices:
send_heartbeat(device)
last_heartbeat_time = time.time() # Reset heartbeat timer

# Start the status listener
getDeviceStatuses()
# Use select to wait for any of the device sockets to have data
try:
readable, _, _ = select.select(sockets, [], [], 0.1)
except Exception as e:
print(f"Device disconnected: {e}")
# Find the invalid socket and remove it
for sock in sockets:
if sock.fileno() == -1:
# reconnect
device_info = config["TuyaDevices"][sockets.index(sock)]
new_device = reconnect_device(device_info, sockets.index(sock), statuses, cool_down_time=5)
if new_device:
devices[sockets.index(sock)] = new_device
sockets[sockets.index(sock)] = new_device.socket
else:
# Remove the invalid socket to avoid the negative file descriptor error
sockets.remove(sock)
continue

if readable:
# Process each socket with incoming data
for sock in readable:
# Find the corresponding device for this socket
device = next((d for d in devices if d.socket == sock), None)
if not device:
continue

updated_status = device.receive()

if updated_status:
print(f"UPDATE status from {device.id}: {updated_status}")
index = devices.index(device)
# We may only get one DPS, so just update that one item
if "dps" in updated_status:
for key in updated_status["dps"]:
statuses[index]["status"][key] = updated_status["dps"][key]
print(f" - Updated status for {device.id} DPS {key} to {updated_status['dps'][key]}")
else:
# Check if the device has disconnected
if not device.socket or device.socket.fileno() == -1:
# Device has disconnected
print(f"Device {device.id} has disconnected.")
# Reconnect logic with cool-down
device_info = config["TuyaDevices"][devices.index(device)]
new_device = reconnect_device(device_info, devices.index(device), statuses, cool_down_time=5)
if new_device:
devices[devices.index(device)] = new_device # Replace the disconnected device
sockets[devices.index(device)] = new_device.socket # Update the socket list
else:
# Remove the invalid socket to avoid the negative file descriptor error
sockets.remove(sock)
else:
print(f"Received empty status from {device.id}")

# Initialize devices
devices = [create_device(i) for i in config["TuyaDevices"]]

# Start the status listener
getDeviceStatuses(devices, config)

0 comments on commit f15cecc

Please sign in to comment.