From be11e232d228c41b0966b2d21e40959d1bcaedd8 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 18 Nov 2024 10:29:41 -0500 Subject: [PATCH] fix: discard impossible timestamps when restoring bluetooth scanner data (#177) --- src/bluetooth_adapters/storage.py | 15 ++++- tests/test_init.py | 101 ++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/src/bluetooth_adapters/storage.py b/src/bluetooth_adapters/storage.py index a3c1b30..7c0e975 100644 --- a/src/bluetooth_adapters/storage.py +++ b/src/bluetooth_adapters/storage.py @@ -96,13 +96,26 @@ def expire_stale_scanner_discovered_device_advertisement_data( DISCOVERED_DEVICE_ADVERTISEMENT_DATAS ] for address, timestamp in timestamps.items(): - if now - timestamp > expire_seconds: + time_diff = now - timestamp + if time_diff > expire_seconds: + expire.append(address) + elif time_diff < 0: + _LOGGER.warning( + "Discarding timestamp %s for %s on scanner %s as it is the future (now = %s)", + timestamp, + address, + scanner, + now, + ) expire.append(address) for address in expire: del timestamps[address] del discovered_device_advertisement_datas[address] if not timestamps: expired_scanners.append(scanner) + _LOGGER.debug( + "Loaded %s fresh discovered devices for %s", len(timestamps), scanner + ) for scanner in expired_scanners: del data_by_scanner[scanner] diff --git a/tests/test_init.py b/tests/test_init.py index c82c753..08dbee1 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -1793,6 +1793,107 @@ def test_expire_stale_scanner_discovered_device_advertisement_data(): assert "all_expired" not in data +@pytest.mark.skipif( + MessageType is None or get_dbus_managed_objects is None, + reason="dbus_fast is not available", +) +def test_expire_future_discovered_device_advertisement_data( + caplog: pytest.LogCaptureFixture, +) -> None: + """Test test_expire_future_discovered_device_advertisement_data.""" + now = time.time() + data = { + "myscanner": DiscoveredDeviceAdvertisementDataDict( + { + "connectable": True, + "discovered_device_advertisement_datas": { + "AA:BB:CC:DD:EE:FF": { + "advertisement_data": { + "local_name": "Test " "Device", + "manufacturer_data": {"76": "0215aabbccddeeff"}, + "rssi": -50, + "service_data": { + "0000180d-0000-1000-8000-00805f9b34fb": "00000000" + }, + "service_uuids": ["0000180d-0000-1000-8000-00805f9b34fb"], + "tx_power": 50, + "platform_data": ["Test Device", ""], + }, + "device": { + "address": "AA:BB:CC:DD:EE:FF", + "details": {"details": "test"}, + "name": "Test " "Device", + "rssi": -50, + }, + }, + "CC:DD:EE:FF:AA:BB": { + "advertisement_data": { + "local_name": "Test " "Device Expired", + "manufacturer_data": {"76": "0215aabbccddeeff"}, + "rssi": -50, + "service_data": { + "0000180d-0000-1000-8000-00805f9b34fb": "00000000" + }, + "service_uuids": ["0000180d-0000-1000-8000-00805f9b34fb"], + "tx_power": 50, + "platform_data": ["Test Device", ""], + }, + "device": { + "address": "CC:DD:EE:FF:AA:BB", + "details": {"details": "test"}, + "name": "Test " "Device Expired", + "rssi": -50, + }, + }, + }, + "discovered_device_timestamps": { + "AA:BB:CC:DD:EE:FF": now, + "CC:DD:EE:FF:AA:BB": now - 100, + }, + "expire_seconds": 100, + } + ), + "all_future": DiscoveredDeviceAdvertisementDataDict( + { + "connectable": True, + "discovered_device_advertisement_datas": { + "CC:DD:EE:FF:AA:BB": { + "advertisement_data": { + "local_name": "Test " "Device Expired", + "manufacturer_data": {"76": "0215aabbccddeeff"}, + "rssi": -50, + "service_data": { + "0000180d-0000-1000-8000-00805f9b34fb": "00000000" + }, + "service_uuids": ["0000180d-0000-1000-8000-00805f9b34fb"], + "tx_power": 50, + "platform_data": ["Test Device", ""], + }, + "device": { + "address": "CC:DD:EE:FF:AA:BB", + "details": {"details": "test"}, + "name": "Test " "Device Expired", + "rssi": -50, + }, + } + }, + "discovered_device_timestamps": {"CC:DD:EE:FF:AA:BB": now + 1000000}, + "expire_seconds": 100, + } + ), + } + expire_stale_scanner_discovered_device_advertisement_data(data) + assert len(data["myscanner"]["discovered_device_advertisement_datas"]) == 1 + assert ( + "CC:DD:EE:FF:AA:BB" + not in data["myscanner"]["discovered_device_advertisement_datas"] + ) + assert "all_future" not in data + assert ( + "for CC:DD:EE:FF:AA:BB on scanner all_future as it is the future" in caplog.text + ) + + def test_discovered_device_advertisement_data_from_dict_corrupt(caplog): """Test discovered_device_advertisement_data_from_dict with corrupt data.""" now = time.time()