Skip to content

Commit

Permalink
improve & test
Browse files Browse the repository at this point in the history
  • Loading branch information
hishizuka committed Oct 15, 2023
1 parent eb98252 commit dd5ceef
Showing 1 changed file with 42 additions and 32 deletions.
74 changes: 42 additions & 32 deletions modules/helper/ble_gatt_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class GadgetbridgeService(Service):
value = bytearray()
value_extend = False

timestamp_str = ""
timestamp_status = False
timestamp_bytes = bytearray()
timestamp_extend = False
timestamp_done = False

def __init__(self, config):
Expand All @@ -52,50 +52,47 @@ def send_message(self, value):
self.tx_characteristic.changed(bytes(value + "\\n\n", "utf-8"))

def atob(self, matchobj):
r = base64.b64decode(matchobj.group(1)).decode()
try:
r = base64.b64decode(matchobj.group(1)).decode("utf-8", "ignore")
except:
r = ""
return f'"{r}"'

# receive from central
@characteristic(rx_characteristic_uuid, CharFlags.WRITE).setter
def rx_characteristic(self, value, options):
app_logger.debug(value)
if value[0] == 0x10:
self.value = bytearray()
self.value_extend = True

self.timestamp_status = False
if (not self.timestamp_done or self.timestamp_status) and value[
1:8
].decode() == "setTime":
self.timestamp_str = value.decode()[1:]
self.timestamp_status = True

elif self.timestamp_status:
self.timestamp_str += value.decode()
# initialize
if value.startswith(b'\x10GB({"') and not len(self.value):
self.value_extend = True
elif value.startswith(b'\x10setTime') and not self.timestamp_done and not len(self.timestamp_bytes):
self.timestamp_extend = True

# expand
if self.timestamp_extend:
self.timestamp_bytes.extend(bytearray(value))
elif self.value_extend:
self.value.extend(bytearray(value))

# finalize: timestamp
if self.timestamp_extend:
res = re.match(
"^setTime\((\d+)\);E.setTimeZone\((\S+)\)", self.timestamp_str
"^setTime\((\d+)\);E.setTimeZone\((\S+)\)", self.timestamp_bytes.decode("utf-8", "ignore")[1:]
)
if res is not None:
self.timestamp_done = True
self.timestamp_status = False
time_diff = datetime.timedelta(hours=float(res.group(2)))
self.config.logger.sensor.sensor_gps.set_timediff_from_utc(time_diff)
utctime = datetime.datetime.fromtimestamp(int(res.group(1))) - time_diff
self.config.logger.sensor.sensor_gps.get_utc_time_from_datetime(utctime)
self.timestamp_done = True
self.timestamp_extend = False
self.timestamp_bytes = bytearray()

if self.value_extend:
self.value.extend(bytearray(value))

# for gadgetbridge JSON message
if (
len(self.value) > 8
and self.value[-3] == 0x7D
and self.value[-2] == 0x29
and self.value[-1] == 0x0A
):
# finalize: gadgetbridge JSON message
elif len(self.value) > 8 and self.value.endswith(b'})\n'):
# remove emoji
text_mod = re.sub(":\w+:", "", self.value.decode().strip()[5:-2])
text_mod = re.sub(":\w+:", "", self.value.decode("utf-8", "ignore").strip()[5:-2])

# decode base64
b64_decode_ptn = re.compile(r"atob\(\"(\S+)\"\)")
Expand All @@ -106,7 +103,7 @@ def rx_characteristic(self, value, options):
message = json.loads("{" + text_mod + "}", strict=False)
except json.JSONDecodeError:
app_logger.exception("failed to load json")
app_logger.exception(self.value.decode().strip()[5:-2])
app_logger.exception(self.value)
app_logger.exception(text_mod)

if (
Expand All @@ -118,7 +115,7 @@ def rx_characteristic(self, value, options):
self.config.gui.show_message(
message["title"], message["body"], limit_length=True
)
app_logger.info(f"success: {message}")
app_logger.info(message)
elif (
"t" in message
and len(message["t"]) >= 4
Expand All @@ -138,9 +135,22 @@ def rx_characteristic(self, value, options):
and "distance" in message
and "action" in message
):
app_logger.info(f"{message['instr']}, {message['distance']}, {message['action']},")
# action
# "","continue",
# "left", "left_slight", "left_sharp", "right", "right_slight", "right_sharp",
# "keep_left", "keep_right", "uturn_left", "uturn_right",
# "offroute",
# "roundabout_right", "roundabout_left", "roundabout_straight", "roundabout_uturn",
# "finish"
app_logger.info(message)
# blank distance: skip
# eta?
app_logger.info(f"{len(self.value)}, {len(self.timestamp_bytes)}")
#msg = f"{message['distance']}, {message['action']}\n{message['instr']}"
#self.config.gui.show_forced_message(msg)

self.value_extend = False
self.value = bytearray()

async def on_off_uart_service(self):
if self.status:
Expand Down

0 comments on commit dd5ceef

Please sign in to comment.