Skip to content

Commit

Permalink
feat:suport heart_rate in export gpx file (#529)
Browse files Browse the repository at this point in the history
* feat:suport heart_rate in export gpx file
  • Loading branch information
zhaohongxuan authored Oct 29, 2023
1 parent 23464b5 commit 272c081
Showing 1 changed file with 102 additions and 17 deletions.
119 changes: 102 additions & 17 deletions run_page/keep_sync.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import argparse
import base64
import json
import math
import os
import time
import zlib
Expand All @@ -16,12 +15,16 @@
from Crypto.Cipher import AES
from generator import Generator
from utils import adjust_time
import xml.etree.ElementTree as ET

# need to test
LOGIN_API = "https://api.gotokeep.com/v1.1/users/login"
RUN_DATA_API = "https://api.gotokeep.com/pd/v3/stats/detail?dateUnit=all&type=running&lastDate={last_date}"
RUN_LOG_API = "https://api.gotokeep.com/pd/v3/runninglog/{run_id}"

HR_FRAME_THRESHOLD_IN_DECISECOND = 100 # Maximum time difference to consider a data point as the nearest, the unit is decisecond(分秒)

TIMESTAMP_THRESHOLD_IN_DECISECOND = 3_600_000 # Threshold for target timestamp adjustment, the unit of timestamp is decisecond(分秒), so the 3_600_000 stands for 100 hours sports time. 100h = 100 * 60 * 60 * 10

# If your points need trans from gcj02 to wgs84 coordinate which use by Mapbox
TRANS_GCJ02_TO_WGS84 = True
Expand Down Expand Up @@ -88,6 +91,17 @@ def parse_raw_data_to_nametuple(
keep_id = run_data["id"].split("_")[1]

start_time = run_data["startTime"]
avg_heart_rate = None
decoded_hr_data = []
if run_data["heartRate"]:
avg_heart_rate = run_data["heartRate"].get("averageHeartRate", None)
heart_rate_data = run_data["heartRate"].get("heartRates", None)
if heart_rate_data is not None:
decoded_hr_data = decode_runmap_data(heart_rate_data)
# fix #66
if avg_heart_rate and avg_heart_rate < 0:
avg_heart_rate = None

if run_data["geoPoints"]:
run_points_data = decode_runmap_data(run_data["geoPoints"], True)
run_points_data_gpx = run_points_data
Expand All @@ -99,20 +113,20 @@ def parse_raw_data_to_nametuple(
for i, p in enumerate(run_points_data_gpx):
p["latitude"] = run_points_data[i][0]
p["longitude"] = run_points_data[i][1]
else:
run_points_data = [[p["latitude"], p["longitude"]] for p in run_points_data]

for p in run_points_data_gpx:
p_hr = find_nearest_hr(decoded_hr_data, int(p["timestamp"]), start_time)
if p_hr:
p["hr"] = p_hr
if with_download_gpx:
if str(keep_id) not in old_gpx_ids:
if (
str(keep_id) not in old_gpx_ids
and run_data["dataType"] == "outdoorRunning"
):
gpx_data = parse_points_to_gpx(run_points_data_gpx, start_time)
download_keep_gpx(gpx_data, str(keep_id))
else:
print(f"ID {keep_id} no gps data")
heart_rate = None
if run_data["heartRate"]:
heart_rate = run_data["heartRate"].get("averageHeartRate", None)
# fix #66
if heart_rate and heart_rate < 0:
heart_rate = None
polyline_str = polyline.encode(run_points_data) if run_points_data else ""
start_latlng = start_point(*run_points_data[0]) if run_points_data else None
start_date = datetime.utcfromtimestamp(start_time / 1000)
Expand All @@ -133,7 +147,7 @@ def parse_raw_data_to_nametuple(
"start_date_local": datetime.strftime(start_date_local, "%Y-%m-%d %H:%M:%S"),
"end_local": datetime.strftime(end_local, "%Y-%m-%d %H:%M:%S"),
"length": run_data["distance"],
"average_heartrate": int(heart_rate) if heart_rate else None,
"average_heartrate": int(avg_heart_rate) if avg_heart_rate else None,
"map": run_map(polyline_str),
"start_latlng": start_latlng,
"distance": run_data["distance"],
Expand Down Expand Up @@ -172,18 +186,34 @@ def get_all_keep_tracks(email, password, old_tracks_ids, with_download_gpx=False


def parse_points_to_gpx(run_points_data, start_time):
# future to support heart rate
"""
Convert run points data to GPX format.
Args:
run_id (str): The ID of the run.
run_points_data (list of dict): A list of run data points.
start_time (int): The start time for adjusting timestamps. Note that the unit of the start_time is millsecond
Returns:
gpx_data (str): GPX data in string format.
"""
points_dict_list = []
# early timestamp fields in keep's data stands for delta time, but in newly data timestamp field stands for exactly time,
# so it does'nt need to plus extra start_time
if run_points_data[0]["timestamp"] > TIMESTAMP_THRESHOLD_IN_DECISECOND:
start_time = 0

for point in run_points_data:
points_dict = {
"latitude": point["latitude"],
"longitude": point["longitude"],
"time": datetime.utcfromtimestamp(
(point["timestamp"] * 100 + start_time) / 1000
(point["timestamp"] * 100 + start_time)
/ 1000 # note that the timestamp of a point is decisecond(分秒)
),
"elevation": point.get("verticalAccuracy"),
"hr": point.get("hr"),
}
if "verticalAccuracy" in point:
points_dict["elevation"] = point["verticalAccuracy"]
points_dict_list.append(points_dict)
gpx = gpxpy.gpx.GPX()
gpx.nsmap["gpxtpx"] = "http://www.garmin.com/xmlschemas/TrackPointExtension/v1"
Expand All @@ -195,12 +225,67 @@ def parse_points_to_gpx(run_points_data, start_time):
gpx_segment = gpxpy.gpx.GPXTrackSegment()
gpx_track.segments.append(gpx_segment)
for p in points_dict_list:
point = gpxpy.gpx.GPXTrackPoint(**p)
point = gpxpy.gpx.GPXTrackPoint(
latitude=p["latitude"],
longitude=p["longitude"],
time=p["time"],
elevation=p.get("elevation"),
)
if p.get("hr") is not None:
gpx_extension_hr = ET.fromstring(
f"""<gpxtpx:TrackPointExtension xmlns:gpxtpx="http://www.garmin.com/xmlschemas/TrackPointExtension/v1">
<gpxtpx:hr>{p["hr"]}</gpxtpx:hr>
</gpxtpx:TrackPointExtension>
"""
)
point.extensions.append(gpx_extension_hr)
gpx_segment.points.append(point)

return gpx.to_xml()


def find_nearest_hr(
hr_data_list, target_time, start_time, threshold=HR_FRAME_THRESHOLD_IN_DECISECOND
):
"""
Find the nearest heart rate data point to the target time.
if cannot found suitable HR data within the specified time frame (within 10 seconds by default), there will be no hr data return
Args:
heart_rate_data (list of dict): A list of heart rate data points, where each point is a dictionary
containing at least "timestamp" and "beatsPerMinute" keys.
target_time (float): The target timestamp for which to find the nearest heart rate data point. Please Note that the unit of target_time is decisecond(分秒),
means 1/10 of a second ,this is very unsual!! so when we convert a target_time to second we need to divide by 10, and when we convert a target time to millsecond
we need to times 100.
start_time (float): The reference start time. the unit of start_time is normal millsecond timestamp
threshold (float, optional): The maximum allowed time difference to consider a data point as the nearest.
Default is HR_THRESHOLD, the unit is decisecond(分秒)
Returns:
int or None: The heart rate value of the nearest data point, or None if no suitable data point is found.
"""
closest_element = None
# init difference value
min_difference = float("inf")
if target_time > TIMESTAMP_THRESHOLD_IN_DECISECOND:
target_time = (
target_time * 100 - start_time
) / 100 # note that the unit of target_time is decisecond(分秒) and the unit of start_time is normal millsecond

for item in hr_data_list:
timestamp = item["timestamp"]
difference = abs(timestamp - target_time)

if difference <= threshold and difference < min_difference:
closest_element = item
min_difference = difference

if closest_element:
hr = closest_element.get("beatsPerMinute")
if hr and hr > 0:
return hr

return None


def download_keep_gpx(gpx_data, keep_id):
try:
print(f"downloading keep_id {str(keep_id)} gpx")
Expand Down

1 comment on commit 272c081

@vercel
Copy link

@vercel vercel bot commented on 272c081 Oct 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

running-page – ./

running-page-yihong0618.vercel.app
running-page-git-master-yihong0618.vercel.app
running-page.vercel.app

Please sign in to comment.