diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 6bbc304a..90b8374b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -19,12 +19,17 @@ jobs: uses: actions/setup-python@v4 with: python-version: 3.8 - - name: Install Forest dependencies + - name: Install Forest dependencies for Linux # required by librosa if: ${{ startsWith(matrix.os, 'ubuntu') }} run: | sudo apt-get update sudo apt-get install -y ffmpeg libsndfile1 + - name: Install Forest dependencies for Windows + # required by librosa + if: ${{ startsWith(matrix.os, 'windows') }} + uses: FedericoCarboni/setup-ffmpeg@v2 + id: setup-ffmpeg - name: Install Forest run: pip install -e . - name: Install dev dependencies diff --git a/.gitignore b/.gitignore index f5a3955b..904de1e2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,9 @@ __pycache__/ .DS_Store -# IntelliJ project files +# IntelliJ, VsCode project files .idea +.vscode # for installing Forest in editable mode when developing /forest.egg-info/ @@ -18,3 +19,6 @@ __pycache__/ #sphinx build docs/_build/ + +# any python environment files +.python-version \ No newline at end of file diff --git a/docs/source/logging.md b/docs/source/logging.md index 581f345b..a47d9e87 100644 --- a/docs/source/logging.md +++ b/docs/source/logging.md @@ -37,13 +37,6 @@ import logging logger = logging.getLogger(__name__) ``` -Or like this: - -``` -from logging import getLogger -logger = getLogger(__name__) -``` - ## 3. How to insert log messages into definitions Basic `logging` messages: diff --git a/forest/bonsai/simulate_gps_data.py b/forest/bonsai/simulate_gps_data.py index 1f0c6249..2e3b77a3 100644 --- a/forest/bonsai/simulate_gps_data.py +++ b/forest/bonsai/simulate_gps_data.py @@ -27,8 +27,8 @@ TRAVELLING_STATUS_LIST = range(11) -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger() +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) class PossibleExits(Enum): diff --git a/forest/constants.py b/forest/constants.py index c71fe71b..420d8c8d 100644 --- a/forest/constants.py +++ b/forest/constants.py @@ -20,12 +20,13 @@ class Frequency(Enum): """This class enumerates possible frequencies for summary data.""" - HOURLY = 1 - DAILY = 24 - HOURLY_AND_DAILY = "hourly_and_daily" - THREE_HOURLY = 3 - SIX_HOURLY = 6 - TWELVE_HOURLY = 12 + MINUTE = 1 + HOURLY = 60 + THREE_HOURLY = 3 * 60 + SIX_HOURLY = 6 * 60 + TWELVE_HOURLY = 12 * 60 + DAILY = 24 * 60 + HOURLY_AND_DAILY = -1 class OSMTags(Enum): diff --git a/forest/jasmine/data2mobmat.py b/forest/jasmine/data2mobmat.py index 5a86784c..6af0b201 100644 --- a/forest/jasmine/data2mobmat.py +++ b/forest/jasmine/data2mobmat.py @@ -15,8 +15,8 @@ TOLERANCE = 1e-6 -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger() +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) def cartesian( @@ -179,6 +179,10 @@ def collapse_data( # Filter out rows where the GPS accuracy is beyond # the provided accuracy_limit data = data[data.accuracy < accuracy_limit] + if data.shape[0] == 0: + raise ValueError( + f"No GPS record with accuracy less than {accuracy_limit}." + ) # Get the start and end timestamps in seconds t_start = sorted(np.array(data.timestamp))[0] / 1000 @@ -1008,7 +1012,8 @@ def infer_mobmat(mobmat: np.ndarray, interval: float, r: float) -> np.ndarray: (mobmat, np.ones(n_rows).reshape(n_rows, 1)) ) # Append new pauses to the trajectory matrix - mobmat = np.vstack((mobmat, new_pauses_array)) + if new_pauses_array.shape[0] > 0: + mobmat = np.vstack((mobmat, new_pauses_array)) # Sort the matrix by start time mobmat = mobmat[mobmat[:, 3].argsort()].astype(float) diff --git a/forest/jasmine/mobmat2traj.py b/forest/jasmine/mobmat2traj.py index 1527a982..1ac1cbf2 100644 --- a/forest/jasmine/mobmat2traj.py +++ b/forest/jasmine/mobmat2traj.py @@ -3,7 +3,6 @@ """ import logging import math -import sys from typing import Optional, Tuple import numpy as np @@ -13,8 +12,8 @@ from .data2mobmat import great_circle_dist, exist_knot -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger() +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) # the details of the functions are in paper [Liu and Onnela (2020)] @@ -278,7 +277,7 @@ def indicate_flight( # Calculate k1 using the specified method k1 = calculate_k1(method, current_t, current_x, current_y, bv_subset, pars) if k1 is None: - sys.exit("Invalid method for calculate_k1.") + raise ValueError("Invalid method for calculate_k1.") # Select flight and pause indicators from the bv_subset flight_k = k1[bv_subset[:, 0] == 1] @@ -662,8 +661,8 @@ def forward_impute( method, start_t, start_x, start_y, flight_table, pars ) - if weight is None: - sys.exit("Invalid method for calculate_k1.") + if weight is None or len(weight) == 0: + raise ValueError("Invalid method for calculate_k1.") normalize_w = (weight + 1e-5) / float(sum(weight + 1e-5)) flight_index = np.random.choice(flight_table.shape[0], p=normalize_w) @@ -743,7 +742,7 @@ def forward_impute( pause_table, pars ) if weight is None: - sys.exit("Invalid method for calculate_k1.") + raise ValueError("Invalid method for calculate_k1.") normalize_w = (weight + 1e-5) / float(sum(weight + 1e-5)) pause_index = np.random.choice(pause_table.shape[0], p=normalize_w) @@ -832,7 +831,7 @@ def backward_impute( flight_table, pars ) if weight is None: - sys.exit("Invalid method for calculate_k1.") + raise ValueError("Invalid method for calculate_k1.") normalize_w = (weight + 1e-5) / float(sum(weight + 1e-5)) flight_index = np.random.choice(flight_table.shape[0], p=normalize_w) @@ -907,8 +906,8 @@ def backward_impute( method, end_t, end_x, end_y, pause_table, pars ) - if weight is None: - sys.exit("Invalid method for calculate_k1.") + if weight is None or len(weight) == 0: + raise ValueError("Invalid method for calculate_k1.") normalize_w = (weight + 1e-5) / float(sum(weight + 1e-5)) pause_index = np.random.choice(pause_table.shape[0], p=normalize_w) @@ -972,6 +971,11 @@ def impute_gps( # for observed flights, observed pauses, and missing intervals flight_table, pause_table, mis_table = create_tables(mob_mat, bv_subset) + if len(flight_table) == 0: + raise ValueError("No flight observed in the data.") + if len(pause_table) == 0: + raise ValueError("No pause observed in the data.") + # initialize the imputed trajectory table imp_table = np.zeros((1, 7)) diff --git a/forest/jasmine/sogp_gps.py b/forest/jasmine/sogp_gps.py index 86931e72..e5532e95 100644 --- a/forest/jasmine/sogp_gps.py +++ b/forest/jasmine/sogp_gps.py @@ -14,8 +14,8 @@ import numpy as np -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger() +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) def calculate_k0(x1: np.ndarray, x2: np.ndarray, pars: list) -> float: diff --git a/forest/jasmine/tests/test_traj2stats.py b/forest/jasmine/tests/test_traj2stats.py index 2955e1e8..156bac89 100644 --- a/forest/jasmine/tests/test_traj2stats.py +++ b/forest/jasmine/tests/test_traj2stats.py @@ -654,7 +654,7 @@ def test_compute_window_size(sample_trajectory): """Testing window size is correct""" window, _ = compute_window_and_count( - sample_trajectory[0, 3], sample_trajectory[-1, 6], 1 + sample_trajectory[0, 3], sample_trajectory[-1, 6], 60 ) assert window == 3600 @@ -664,7 +664,7 @@ def test_compute_window_count(sample_trajectory): """Testing number of windows is correct""" _, num_windows = compute_window_and_count( - sample_trajectory[0, 3], sample_trajectory[-1, 6], 1 + sample_trajectory[0, 3], sample_trajectory[-1, 6], 60 ) assert num_windows == 24 @@ -674,7 +674,7 @@ def test_compute_window_size_6_hour(sample_trajectory): """Testing window size is correct 6 hour window""" window, _ = compute_window_and_count( - sample_trajectory[0, 3], sample_trajectory[-1, 6], 6 + sample_trajectory[0, 3], sample_trajectory[-1, 6], 360 ) assert window == 3600 * 6 @@ -684,7 +684,7 @@ def test_compute_window_count_6_hour(sample_trajectory): """Testing number of windows is correct 6 hour window""" _, num_windows = compute_window_and_count( - sample_trajectory[0, 3], sample_trajectory[-1, 6], 6 + sample_trajectory[0, 3], sample_trajectory[-1, 6], 360 ) assert num_windows == 4 diff --git a/forest/jasmine/traj2stats.py b/forest/jasmine/traj2stats.py index 7ec7d154..206792ac 100644 --- a/forest/jasmine/traj2stats.py +++ b/forest/jasmine/traj2stats.py @@ -32,8 +32,8 @@ from forest.utils import get_ids -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger() +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) @dataclass @@ -1125,8 +1125,8 @@ def gps_summaries( ValueError: Frequency is not valid """ - if frequency == Frequency.HOURLY_AND_DAILY: - raise ValueError("Frequency must be 'hourly' or 'daily'") + if frequency in [Frequency.HOURLY_AND_DAILY, Frequency.MINUTE]: + raise ValueError(f"Frequency cannot be {frequency.name.lower()}.") if frequency != Frequency.DAILY: parameters.split_day_night = False @@ -1161,7 +1161,7 @@ def gps_summaries( traj, [3, 4, 5], tz_str, 3600*24 ) window, num_windows = compute_window_and_count( - start_stamp, end_stamp, 24, parameters.split_day_night + start_stamp, end_stamp, 24*60, parameters.split_day_night ) if num_windows <= 0: @@ -1484,7 +1484,7 @@ def get_time_range( def compute_window_and_count( - start_stamp: int, end_stamp: int, window_hours: int, + start_stamp: int, end_stamp: int, window_minutes: int, split_day_night: bool = False ) -> Tuple[int, int]: """Computes the window and number of windows based on given time stamps. @@ -1492,7 +1492,7 @@ def compute_window_and_count( Args: start_stamp: int, starting time stamp end_stamp: int, ending time stamp - window_hours: int, window in hours + window_minutes: int, window in minutes split_day_night: bool, True if split day and night Returns: A tuple of two integers (window, num_windows): @@ -1500,7 +1500,7 @@ def compute_window_and_count( num_windows: int, number of windows """ - window = window_hours * 60 * 60 + window = window_minutes * 60 num_windows = (end_stamp - start_stamp) // window if split_day_night: num_windows *= 2 @@ -1561,7 +1561,8 @@ def gps_stats_main( Args: study_folder: str, the path of the study folder output_folder: str, the path of the folder - where you want to save results + where you want to save results. A folder named jasmine + will be created containing all output. tz_str: str, timezone frequency: Frequency, the frequency of the summary stats (resolution for summary statistics) @@ -1595,13 +1596,33 @@ def gps_stats_main( as pickle files for future use and a record csv file to show which users are processed and logger csv file to show warnings and bugs during the run + Raises: + ValueError: Frequency is not valid """ - - os.makedirs(output_folder, exist_ok=True) + # no minutely analysis on GPS data + if frequency == Frequency.MINUTE: + raise ValueError("Frequency cannot be minutely.") if parameters is None: parameters = Hyperparameters() + if frequency == Frequency.HOURLY_AND_DAILY: + frequencies = [Frequency.HOURLY, Frequency.DAILY] + else: + frequencies = [frequency] + + # Ensure that the correct output folder structures exist, centralize folder + # names. Note that frequencies + trajectory_folder = f"{output_folder}/trajectory" + logs_folder = f"{output_folder}/logs" + os.makedirs(output_folder, exist_ok=True) + os.makedirs(logs_folder, exist_ok=True) + for freq in frequencies: + os.makedirs(f"{output_folder}/{freq.name.lower()}", exist_ok=True) + if save_traj: + os.makedirs(trajectory_folder, exist_ok=True) + + # pars0 is passed to bv_select, pars1 to impute_gps pars0 = [ parameters.l1, parameters.l2, parameters.l3, parameters.a1, parameters.a2, parameters.b1, parameters.b2, parameters.b3 @@ -1614,24 +1635,20 @@ def gps_stats_main( # participant_ids should be a list of str if participant_ids is None: participant_ids = get_ids(study_folder) - # create a record of processed user participant_id and starting/ending time + # Create a record of processed participant_id and starting/ending time. + # These are updated and saved to disk after each participant is processed. + all_memory_dict_file = f"{output_folder}/all_memory_dict.pkl" + all_bv_set_file = f"{output_folder}/all_bv_set.pkl" if all_memory_dict is None: all_memory_dict = {} for participant_id in participant_ids: all_memory_dict[str(participant_id)] = None - if all_bv_set is None: all_bv_set = {} for participant_id in participant_ids: all_bv_set[str(participant_id)] = None - if frequency == Frequency.HOURLY_AND_DAILY: - os.makedirs(f"{output_folder}/hourly", exist_ok=True) - os.makedirs(f"{output_folder}/daily", exist_ok=True) - if save_traj: - os.makedirs(f"{output_folder}/trajectory", exist_ok=True) - for participant_id in participant_ids: logger.info("User: %s", participant_id) # data quality check @@ -1677,6 +1694,7 @@ def gps_stats_main( params_w = np.mean(data.accuracy) else: params_w = parameters.w + # process data mobmat1 = gps_to_mobmat( data, parameters.itrvl, parameters.accuracylim, @@ -1694,6 +1712,8 @@ def gps_stats_main( ) all_bv_set[str(participant_id)] = bv_set = out_dict["BV_set"] all_memory_dict[str(participant_id)] = out_dict["memory_dict"] + + # impute_gps can fail, if so we skip this participant. try: imp_table = impute_gps( mobmat2, bv_set, parameters.method, @@ -1703,6 +1723,7 @@ def gps_stats_main( except RuntimeError as e: logger.error("Error: %s", e) continue + traj = imp_to_traj(imp_table, mobmat2, params_w) # raise error if traj coordinates are not in the range of # [-90, 90] and [-180, 180] @@ -1722,72 +1743,64 @@ def gps_stats_main( "[-90, 90] and [-180, 180]." ) # save all_memory_dict and all_bv_set - with open(f"{output_folder}/all_memory_dict.pkl", "wb") as f: + with open(all_memory_dict_file, "wb") as f: pickle.dump(all_memory_dict, f) - with open(f"{output_folder}/all_bv_set.pkl", "wb") as f: + with open(all_bv_set_file, "wb") as f: pickle.dump(all_bv_set, f) if save_traj is True: pd_traj = pd.DataFrame(traj) pd_traj.columns = ["status", "x0", "y0", "t0", "x1", "y1", "t1", "obs"] pd_traj.to_csv( - f"{output_folder}/trajectory/{participant_id}.csv", + f"{trajectory_folder}/{participant_id}.csv", index=False ) - if frequency == Frequency.HOURLY_AND_DAILY: - summary_stats1, logs1 = gps_summaries( - traj, - tz_str, - Frequency.HOURLY, - parameters, - places_of_interest, - osm_tags, - ) - write_all_summaries(participant_id, summary_stats1, - f"{output_folder}/hourly") - summary_stats2, logs2 = gps_summaries( - traj, - tz_str, - Frequency.DAILY, - parameters, - places_of_interest, - osm_tags, - ) - write_all_summaries(participant_id, summary_stats2, - f"{output_folder}/daily") - if parameters.save_osm_log: - os.makedirs(f"{output_folder}/logs", exist_ok=True) - with open( - f"{output_folder}/logs/locations_logs_hourly.json", - "w", - ) as hourly: - json.dump(logs1, hourly, indent=4) - with open( - f"{output_folder}/logs/locations_logs_daily.json", - "w", - ) as daily: - json.dump(logs2, daily, indent=4) - else: - summary_stats, logs = gps_summaries( - traj, - tz_str, - frequency, - parameters, - places_of_interest, - osm_tags, - ) - write_all_summaries( - participant_id, summary_stats, output_folder + + # generate summary stats. + # (variable "frequency" is already declared in signature) + for freq in frequencies: + gps_stats_generate_summary( + traj=traj, + tz_str=tz_str, + frequency=freq, + participant_id=participant_id, + output_folder=f"{output_folder}/{freq.name.lower()}", + logs_folder=logs_folder, + parameters=parameters, + places_of_interest=places_of_interest, + osm_tags=osm_tags, ) - if parameters.save_osm_log: - os.makedirs(f"{output_folder}/logs", exist_ok=True) - with open( - f"{output_folder}/logs/locations_logs.json", - "w", - ) as loc: - json.dump(logs, loc, indent=4) else: logger.info( "GPS data are not collected" " or the data quality is too low" ) + + +def gps_stats_generate_summary( + traj: np.ndarray, + tz_str: str, + frequency: Frequency, + participant_id: str, + output_folder: str, + logs_folder: str, + parameters: Hyperparameters, + places_of_interest: Optional[list] = None, + osm_tags: Optional[List[OSMTags]] = None): + """This is simply the inner functionality of gps_stats_main. + Runs summaries code, writes to disk, saves logs if required. """ + summary_stats, logs = gps_summaries( + traj, + tz_str, + frequency, + parameters, + places_of_interest, + osm_tags, + ) + write_all_summaries(participant_id, summary_stats, output_folder) + if parameters.save_osm_log: + with open( + f"{logs_folder}/locations_logs_{frequency.name.lower()}.json", + "wa", + ) as loc: + json.dump(logs, loc, indent=4) diff --git a/forest/oak/base.py b/forest/oak/base.py index 14ac0e49..f21ef82a 100644 --- a/forest/oak/base.py +++ b/forest/oak/base.py @@ -55,6 +55,13 @@ def preprocess_bout(t_bout: np.ndarray, x_bout: np.ndarray, y_bout: np.ndarray, - t_bout_interp: resampled timestamp (in Unix) - vm_bout_interp: vector magnitude of acceleration """ + + if ( + len(t_bout) < 2 or len(x_bout) < 2 or + len(y_bout) < 2 or len(z_bout) < 2 + ): + return np.array([]), np.array([]) + t_bout_interp = t_bout - t_bout[0] t_bout_interp = np.arange(t_bout_interp[0], t_bout_interp[-1], (1/fs)) t_bout_interp = t_bout_interp + t_bout[0] @@ -85,7 +92,8 @@ def preprocess_bout(t_bout: np.ndarray, x_bout: np.ndarray, y_bout: np.ndarray, z_bout_interp**2) # standardize measurement to gravity units (g) if its recorded in m/s**2 - if np.mean(vm_bout_interp) > 5: + # Also avoid a runtime warning of taking the mean of an empty slice + if vm_bout_interp.shape[0] > 0 and np.mean(vm_bout_interp) > 5: x_bout_interp = x_bout_interp/9.80665 y_bout_interp = y_bout_interp/9.80665 z_bout_interp = z_bout_interp/9.80665 @@ -137,8 +145,7 @@ def get_pp(vm_bout: np.ndarray, fs: int = 10) -> npt.NDArray[np.float64]: """ vm_res_sec = vm_bout.reshape((fs, -1), order="F") - pp = np.array([max(vm_res_sec[:, i])-min(vm_res_sec[:, i]) - for i in range(vm_res_sec.shape[1])]) + pp = np.ptp(vm_res_sec, axis=0) return pp @@ -450,24 +457,29 @@ def preprocess_dates( date.replace(tzinfo=from_zone).astimezone(to_zone) for date in dates ] # trim dataset according to time_start and time_end - if time_start is not None and time_end is not None: + if time_start is None or time_end is None: + dates_filtered = dates + else: time_min = datetime.strptime(time_start, fmt) time_min = time_min.replace(tzinfo=from_zone).astimezone(to_zone) time_max = datetime.strptime(time_end, fmt) time_max = time_max.replace(tzinfo=from_zone).astimezone(to_zone) - dates = [date for date in dates if time_min <= date <= time_max] + dates_filtered = [ + date for date in dates if time_min <= date <= time_max + ] dates_shifted = [date-timedelta(hours=date.hour) for date in dates] + # create time vector with days for analysis if time_start is None: - date_start = dates_shifted[0] + date_start = dates_filtered[0] date_start = date_start - timedelta(hours=date_start.hour) else: date_start = datetime.strptime(time_start, fmt) date_start = date_start.replace(tzinfo=from_zone).astimezone(to_zone) date_start = date_start - timedelta(hours=date_start.hour) if time_end is None: - date_end = dates_shifted[-1] + date_end = dates_filtered[-1] date_end = date_end - timedelta(hours=date_end.hour) else: date_end = datetime.strptime(time_end, fmt) @@ -478,7 +490,7 @@ def preprocess_dates( def run_hourly( - t_hours_pd: pd.Series, days_hourly: pd.DatetimeIndex, + t_hours_pd: pd.Series, t_ind_pydate: list, cadence_bout: np.ndarray, steps_hourly: np.ndarray, walkingtime_hourly: np.ndarray, cadence_hourly: np.ndarray, frequency: Frequency @@ -489,7 +501,7 @@ def run_hourly( Args: t_hours_pd: pd.Series timestamp of each measurement - days_hourly: pd.DatetimeIndex + t_ind_pydate: list list of days with hourly resolution cadence_bout: np.ndarray cadence of each measurement @@ -503,11 +515,10 @@ def run_hourly( summary statistics format, Frequency class at constants.py """ for t_unique in t_hours_pd.unique(): - t_ind_pydate = [t_ind.to_pydatetime() for t_ind in days_hourly] # get indexes of ranges of dates that contain t_unique ind_to_store = -1 for ind_to_store, t_ind in enumerate(t_ind_pydate): - if t_ind <= t_unique < t_ind + timedelta(hours=frequency.value): + if t_ind <= t_unique < t_ind + timedelta(minutes=frequency.value): break cadence_temp = cadence_bout[t_hours_pd == t_unique] cadence_temp = cadence_temp[cadence_temp > 0] @@ -555,13 +566,15 @@ def run(study_folder: str, output_folder: str, tz_str: Optional[str] = None, from_zone = tz.gettz('UTC') to_zone = tz.gettz(tz_str) if tz_str else from_zone + freq_str = frequency.name.lower() + # create folders to store results if frequency == Frequency.HOURLY_AND_DAILY: os.makedirs(os.path.join(output_folder, "daily"), exist_ok=True) os.makedirs(os.path.join(output_folder, "hourly"), exist_ok=True) else: os.makedirs( - os.path.join(output_folder, frequency.name.lower()), exist_ok=True + os.path.join(output_folder, freq_str), exist_ok=True ) if users is None: users = get_ids(study_folder) @@ -578,21 +591,38 @@ def run(study_folder: str, output_folder: str, tz_str: Optional[str] = None, ) days = pd.date_range(date_start, date_end, freq='D') - if (frequency == Frequency.HOURLY_AND_DAILY - or frequency == Frequency.HOURLY): - freq = 'H' - else: - freq = str(frequency.value) + 'H' - days_hourly = pd.date_range(date_start, date_end+timedelta(days=1), - freq=freq)[:-1] + # allocate memory steps_daily = np.full((len(days), 1), np.nan) cadence_daily = np.full((len(days), 1), np.nan) walkingtime_daily = np.full((len(days), 1), np.nan) - steps_hourly = np.full((len(days_hourly), 1), np.nan) - cadence_hourly = np.full((len(days_hourly), 1), np.nan) - walkingtime_hourly = np.full((len(days_hourly), 1), np.nan) + steps_hourly = np.full((1, 1), np.nan) + cadence_hourly = np.full((1, 1), np.nan) + walkingtime_hourly = np.full((1, 1), np.nan) + t_ind_pydate = pd.Series([], dtype='datetime64[ns]') + t_ind_pydate_str = None + + if frequency != Frequency.DAILY: + if ( + frequency == Frequency.HOURLY_AND_DAILY + or frequency == Frequency.HOURLY + ): + freq = 'H' + elif frequency == Frequency.MINUTE: + freq = 'T' + else: + freq = str(frequency.value/60) + 'H' + + days_hourly = pd.date_range(date_start, date_end+timedelta(days=1), + freq=freq)[:-1] + + steps_hourly = np.full((len(days_hourly), 1), np.nan) + cadence_hourly = np.full((len(days_hourly), 1), np.nan) + walkingtime_hourly = np.full((len(days_hourly), 1), np.nan) + + t_ind_pydate = days_hourly.to_pydatetime() + t_ind_pydate_str = t_ind_pydate.astype(str) for d_ind, d_datetime in enumerate(days): logger.info("Day: %d", d_ind) @@ -618,6 +648,8 @@ def run(study_folder: str, output_folder: str, tz_str: Optional[str] = None, z = np.array(data["z"], dtype="float64") # z-axis acc. # preprocess data fragment t_bout_interp, vm_bout = preprocess_bout(timestamp, x, y, z) + if len(t_bout_interp) == 0: # no valid data to process here + continue # find walking and estimate cadence cadence_bout = find_walking(vm_bout) # distribute metrics across hours @@ -628,14 +660,18 @@ def run(study_folder: str, output_folder: str, tz_str: Optional[str] = None, ] # transform t to full hours t_series = pd.Series(t_datetime) - t_hours_pd = t_series.dt.floor('H') + if frequency == Frequency.MINUTE: + t_hours_pd = t_series.dt.floor('T') + else: + t_hours_pd = t_series.dt.floor('H') + # convert t_hours to correct timezone t_hours_pd = t_hours_pd.dt.tz_localize( from_zone ).dt.tz_convert(to_zone) run_hourly( - t_hours_pd, days_hourly, cadence_bout, steps_hourly, + t_hours_pd, t_ind_pydate, cadence_bout, steps_hourly, walkingtime_hourly, cadence_hourly, frequency ) @@ -655,13 +691,12 @@ def run(study_folder: str, output_folder: str, tz_str: Optional[str] = None, 'walking_time': walkingtime_daily[:, -1], 'steps': steps_daily[:, -1], 'cadence': cadence_daily[:, -1]}) - output_file = user + "_gait_daily.csv" + output_file = user + ".csv" dest_path = os.path.join(output_folder, "daily", output_file) summary_stats.to_csv(dest_path, index=False) if frequency != Frequency.DAILY: summary_stats = pd.DataFrame({ - 'date': [date.strftime('%Y-%m-%d %H:%M:%S') - for date in days_hourly], + 'date': t_ind_pydate_str, 'walking_time': walkingtime_hourly[:, -1], 'steps': steps_hourly[:, -1], 'cadence': cadence_hourly[:, -1]}) @@ -669,6 +704,6 @@ def run(study_folder: str, output_folder: str, tz_str: Optional[str] = None, if frequency == Frequency.HOURLY_AND_DAILY: freq_name = "hourly" else: - freq_name = frequency.name.lower() + freq_name = freq_str dest_path = os.path.join(output_folder, freq_name, output_file) summary_stats.to_csv(dest_path, index=False) diff --git a/forest/oak/tests/test_run_hourly.py b/forest/oak/tests/test_run_hourly.py index cb6beb37..242b52bd 100644 --- a/forest/oak/tests/test_run_hourly.py +++ b/forest/oak/tests/test_run_hourly.py @@ -20,12 +20,12 @@ def sample_run_input(signal_bout): "2020-02-25 08:00:00-05:00", "2020-02-25 08:00:00-05:00" ], utc=True).tz_convert('US/Eastern')) - days_hourly = pd.date_range( + t_ind_pydate = pd.date_range( start='2020-02-24 00:00:00', end='2020-02-25 23:00:00', freq='H', tz='US/Eastern' - ) + ).to_pydatetime() cadence_bout = np.array( [1.65, 1.6, 1.55, 1.6, 1.55, 1.85, 1.8, 1.75, 1.75, 1.7] ) @@ -35,7 +35,7 @@ def sample_run_input(signal_bout): return ( t_hours_pd, - days_hourly, + t_ind_pydate, cadence_bout, steps_hourly, walkingtime_hourly, diff --git a/forest/sycamore/base.py b/forest/sycamore/base.py index 853ab2d8..86649926 100644 --- a/forest/sycamore/base.py +++ b/forest/sycamore/base.py @@ -91,6 +91,13 @@ def compute_survey_stats( Returns: True if successful, False otherwise """ + + if submits_timeframe not in [ + Frequency.HOURLY_AND_DAILY, Frequency.HOURLY, Frequency.DAILY + ]: + logger.error("Error: Invalid submits timeframe") + return False + os.makedirs(output_folder, exist_ok=True) os.makedirs(os.path.join(output_folder, "summaries"), exist_ok=True) os.makedirs(os.path.join(output_folder, "by_survey"), exist_ok=True) @@ -217,12 +224,16 @@ def compute_survey_stats( def get_submits_for_tableau( - study_folder: str, output_folder: str, config_path: str, - tz_str: str = "UTC", start_date: str = EARLIEST_DATE, - end_date: Optional[str] = None, users: Optional[List] = None, - interventions_filepath: Optional[str] = None, - submits_timeframe: Frequency = Frequency.DAILY, - history_path: Optional[str] = None + study_folder: str, + output_folder: str, + config_path: str, + tz_str: str = "UTC", + start_date: str = EARLIEST_DATE, + end_date: Optional[str] = None, + users: Optional[List] = None, + interventions_filepath: Optional[str] = None, + submits_timeframe: Frequency = Frequency.DAILY, + history_path: Optional[str] = None ) -> None: """Get survey submissions per day for integration into Tableau WDC @@ -240,8 +251,7 @@ def get_submits_for_tableau( end_date: The latest survey data to read in, in YYYY-MM-DD format users: - List of users in study for which we - are generating a survey schedule + List of users in study that we are generating a survey schedule for interventions_filepath: filepath where interventions json file is. submits_timeframe: @@ -250,58 +260,49 @@ def get_submits_for_tableau( history_path: Filepath to the survey history file. If this is not included, audio survey timings cannot be estimated. """ + if submits_timeframe not in [ + Frequency.HOURLY, Frequency.DAILY, Frequency.HOURLY_AND_DAILY + ]: + logger.error("Error: Invalid submits timeframe") + return + + if submits_timeframe == Frequency.HOURLY_AND_DAILY: + submits_timeframes = [Frequency.HOURLY, Frequency.DAILY] + else: + submits_timeframes = [submits_timeframe] + os.makedirs(output_folder, exist_ok=True) + for freq in submits_timeframes: + os.makedirs(f"{output_folder}/{freq.name.lower()}", exist_ok=True) if users is None: users = get_ids(study_folder) - if end_date is None: end_date = get_month_from_today() # Read, aggregate and clean data - else: - agg_data = aggregate_surveys_config( - study_folder, config_path, tz_str, users, start_date, - end_date, augment_with_answers=True, include_audio_surveys=True - ) - - if agg_data.shape[0] == 0: - logger.error("Error: No survey data found in %s", study_folder) - return - - # Create survey submits detail and summary - ss_detail = survey_submits( - config_path, start_date, end_date, - users, agg_data, interventions_filepath, history_path - ) + agg_data = aggregate_surveys_config( + study_folder, config_path, tz_str, users, start_date, + end_date, augment_with_answers=True, include_audio_surveys=True + ) - if ss_detail.shape[0] == 0: - logger.error("Error: no submission data found") - return + if agg_data.shape[0] == 0: + logger.error("Error: No survey data found in %s", study_folder) + return - if submits_timeframe == Frequency.HOURLY_AND_DAILY: - ss_summary_h = summarize_submits( - ss_detail, Frequency.HOURLY, False - ) - ss_summary_d = summarize_submits( - ss_detail, Frequency.DAILY, False - ) - - write_data_by_user(ss_summary_d, - os.path.join(output_folder, "both", "daily"), - users) - write_data_by_user(ss_summary_h, - os.path.join(output_folder, "both", "hourly"), - users) + # Create survey submits detail and summary + ss_detail = survey_submits( + config_path, start_date, end_date, + users, agg_data, interventions_filepath, history_path + ) - elif submits_timeframe == Frequency.HOURLY: - ss_summary_h = summarize_submits( - ss_detail, Frequency.HOURLY, False - ) - write_data_by_user(ss_summary_h, output_folder, users) + if ss_detail.shape[0] == 0: + logger.error("Error: no submission data found") + return - elif submits_timeframe == Frequency.DAILY: - ss_summary_d = summarize_submits( - ss_detail, Frequency.DAILY, False - ) - write_data_by_user(ss_summary_d, output_folder, users) + # run once for every submits_timeframe, per-user is handled internally + for freq in submits_timeframes: + ss_summary = summarize_submits(ss_detail, freq, False) + write_data_by_user( + ss_summary, f"{output_folder}/{freq.name.lower()}", users + ) diff --git a/forest/sycamore/read_audio.py b/forest/sycamore/read_audio.py index 5249b31b..84d91e55 100644 --- a/forest/sycamore/read_audio.py +++ b/forest/sycamore/read_audio.py @@ -136,7 +136,7 @@ def read_user_audio_recordings_stream( if valid_file: all_files.append(filename) all_durations.append(librosa.get_duration( - filename=os.path.join(audio_dir, survey, filename) + path=os.path.join(audio_dir, survey, filename) )) if len(all_files) == 0: diff --git a/forest/sycamore/utils.py b/forest/sycamore/utils.py index a4f365cd..9b69b70f 100644 --- a/forest/sycamore/utils.py +++ b/forest/sycamore/utils.py @@ -31,8 +31,7 @@ def get_month_from_today(): datetime.timedelta(31)).strftime("%Y-%m-%d") -def filename_to_timestamp(filename: str, tz_str: str = "UTC" - ) -> pd.Timestamp: +def filename_to_timestamp(filename: str, tz_str: str = "UTC") -> pd.Timestamp: """Extract a datetime from a filepath. Args: diff --git a/forest/willow/log_stats.py b/forest/willow/log_stats.py index a40e1be0..b797e981 100644 --- a/forest/willow/log_stats.py +++ b/forest/willow/log_stats.py @@ -17,8 +17,8 @@ ) -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger() +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) def text_analysis( @@ -304,9 +304,6 @@ def comm_logs_summaries( Returns: pandas dataframe of summary stats - - Raises: - ValueError: if frequency is not of correct type """ summary_stats = [] start_year, start_month, start_day, start_hour, _, _ = stamp2datetime( @@ -318,7 +315,9 @@ def comm_logs_summaries( # determine the starting and ending timestamp again based on the frequency if frequency == Frequency.HOURLY_AND_DAILY: - raise ValueError("frequency not of correct type") + logger.error( + "Error: frequency cannot be HOURLY_AND_DAILY for this function" + ) if frequency == Frequency.DAILY: table_start = datetime2stamp( @@ -337,7 +336,7 @@ def comm_logs_summaries( # determine the step size based on the frequency # step_size is in seconds - step_size = 3600 * frequency.value + step_size = 60 * frequency.value # for each chunk, calculate the summary statistics (colmean or count) for stamp in np.arange(table_start, table_end + 1, step=step_size): @@ -366,7 +365,7 @@ def comm_logs_summaries( if frequency == Frequency.DAILY: newline = [year, month, day] + newline else: - newline = [year, month, day, hour] + newline[:16] + newline = [year, month, day, hour] + newline[:17] summary_stats.append(newline) @@ -414,7 +413,7 @@ def log_stats_main( frequency: Frequency, time_start: Optional[List] = None, time_end: Optional[List] = None, - beiwe_id: Optional[List[str]] = None, + beiwe_ids: Optional[List[str]] = None, ) -> None: """Main function for calculating the summary statistics for the communication logs. @@ -427,123 +426,115 @@ def log_stats_main( determining resolution of the summary stats time_start: starting timestamp of the study time_end: ending timestamp of the study - beiwe_id: list of Beiwe IDs to be processed + beiwe_ids: list of Beiwe IDs to be processed """ - os.makedirs(output_folder, exist_ok=True) + + if frequency not in [ + Frequency.HOURLY_AND_DAILY, Frequency.DAILY, Frequency.HOURLY + ]: + logger.error( + "Error: frequency must be one of the following: " + "HOURLY_AND_DAILY, DAILY, HOURLY" + ) if frequency == Frequency.HOURLY_AND_DAILY: - os.makedirs(output_folder + "/hourly", exist_ok=True) - os.makedirs(output_folder + "/daily", exist_ok=True) + frequencies = [Frequency.HOURLY, Frequency.DAILY] + else: + frequencies = [frequency] + + os.makedirs(output_folder, exist_ok=True) + for freq in frequencies: + os.makedirs(f"{output_folder}/{freq.name.lower()}", exist_ok=True) # beiwe_id should be a list of str - if beiwe_id is None: - beiwe_id = [ - i for i in os.listdir(study_folder) - if os.path.isdir(f"{study_folder}/{i}") + if beiwe_ids is None: + beiwe_ids = [ + participant_id for participant_id in os.listdir(study_folder) + if os.path.isdir(f"{study_folder}/{participant_id}") ] - if len(beiwe_id) > 0: - for bid in beiwe_id: - logger.info("User: %s", bid) + # process the data for each participant in each frequency into a folder of + # the corresponding frequency. + for beiwe_id in beiwe_ids: + for freq in frequencies: + logger.info("(%s) Participant: %s", freq.name.lower(), beiwe_id) try: - # read data - text_data, text_stamp_start, text_stamp_end = read_data( - bid, study_folder, "texts", tz_str, time_start, time_end - ) - call_data, call_stamp_start, call_stamp_end = read_data( - bid, study_folder, "calls", tz_str, time_start, time_end + log_stats_inner( + beiwe_id, + f"{output_folder}/{freq.name.lower()}", + study_folder, + frequency, + tz_str, + time_start, + time_end ) + except Exception as err: + logger.error("An error occurred when processing data: %s", err) - if text_data.shape[0] > 0 or call_data.shape[0] > 0: - # stamps from call and text should be the stamp_end - logger.info("Data imported ...") - stamp_start = min(text_stamp_start, call_stamp_start) - stamp_end = max(text_stamp_end, call_stamp_end) - - # process data - if frequency == Frequency.HOURLY_AND_DAILY: - stats_pdframe1 = comm_logs_summaries( - text_data, - call_data, - stamp_start, - stamp_end, - tz_str, - Frequency.HOURLY, - ) - stats_pdframe2 = comm_logs_summaries( - text_data, - call_data, - stamp_start, - stamp_end, - tz_str, - Frequency.DAILY, - ) - - write_all_summaries( - bid, stats_pdframe1, output_folder + "/hourly" - ) - write_all_summaries( - bid, stats_pdframe2, output_folder + "/daily" - ) - else: - stats_pdframe = comm_logs_summaries( - text_data, - call_data, - stamp_start, - stamp_end, - tz_str, - frequency, - ) - # num_uniq_individuals_call_or_text is the cardinality - # of the union of several sets. It should should always - # be at least as large as the cardinality of any one of - # the sets, and it should never be larger than the sum - # of the cardinalities of all of the sets - # (it may be equal if all the sets are disjoint) - sum_all_set_cols = pd.Series( - [0]*stats_pdframe.shape[0] - ) - for col in [ - "num_s_tel", "num_r_tel", "num_in_caller", - "num_out_caller", "num_mis_caller" - ]: - sum_all_set_cols += stats_pdframe[col] - if ( - stats_pdframe[ - "num_uniq_individuals_call_or_text" - ] < stats_pdframe[col] - ).any(): - logger.error( - "Error: " - "num_uniq_individuals_call_or_text " - "was found to be less than %s for at " - "least one time interval. This error " - "comes from an issue with the code," - " not an issue with the input data", - col - ) - if ( - stats_pdframe[ - "num_uniq_individuals_call_or_text" - ] > sum_all_set_cols - ).any(): - logger.error( - "Error: " - "num_uniq_individuals_call_or_text " - "was found to be larger than the sum " - "of individual cardinalities for at " - "least one time interval. This error " - "comes from an issue with the code," - " not an issue with the input data" - ) - - write_all_summaries(bid, stats_pdframe, output_folder) - - logger.info( - "Summary statistics obtained. Finished." - ) + logger.info("Summary statistics obtained. Finished.") - except Exception as err: - logger.error( - "An error occurred when processing the data: %s", err - ) + +def log_stats_inner( + beiwe_id: str, + output_folder: str, + study_folder: str, + frequency: Frequency, + tz_str: str, + time_start: Optional[List] = None, + time_end: Optional[List] = None, +): + """ Inner functionality of log_stats_main """ + # read data + text_data, text_stamp_start, text_stamp_end = read_data( + beiwe_id, study_folder, "texts", tz_str, time_start, time_end + ) + call_data, call_stamp_start, call_stamp_end = read_data( + beiwe_id, study_folder, "calls", tz_str, time_start, time_end + ) + + # give up early if there is no data + if text_data.shape[0] <= 0 and call_data.shape[0] <= 0: + logger.info("There was no data for participant %s", beiwe_id) + return + + # stamps from call and text should be the stamp_end + logger.info("Data imported ...") + stamp_start = min(text_stamp_start, call_stamp_start) + stamp_end = max(text_stamp_end, call_stamp_end) + + # process the data + stats_pdframe = comm_logs_summaries( + text_data, call_data, stamp_start, stamp_end, tz_str, frequency + ) + + # num_uniq_individuals_call_or_text is the cardinality of the union of + # several sets. It should should always be at least as large as the + # cardinality of any one of the sets, and it should never be larger than + # the sum of the cardinalities of all of the sets. (it may be equal if all + # the sets are disjoint) + num_uniq_column = "num_uniq_individuals_call_or_text" # legibility hax. + sum_all_set_cols = pd.Series([0]*stats_pdframe.shape[0]) + for column in [ + "num_s_tel", "num_r_tel", "num_in_caller", + "num_out_caller", "num_mis_caller" + ]: + sum_all_set_cols += stats_pdframe[column] + if (stats_pdframe[num_uniq_column] < stats_pdframe[column]).any(): + logger.error( + "Error: " + "num_uniq_individuals_call_or_text was found to be less than " + "%s for at least one time interval. This error comes from an " + "issue with the code, not an issue with the input data.", + column + ) + + if (stats_pdframe[num_uniq_column] > sum_all_set_cols).any(): + logger.error( + "Error: " + "num_uniq_individuals_call_or_text was found to be larger than the" + "sum of individual cardinalities for at least one time interval. " + "This error comes from an issue with the code, not an issue with " + "the input data." + ) + + write_all_summaries(beiwe_id, stats_pdframe, output_folder) diff --git a/forest/willow/tests/test_log_stats.py b/forest/willow/tests/test_log_stats.py index ddc40ed5..1a871fe5 100644 --- a/forest/willow/tests/test_log_stats.py +++ b/forest/willow/tests/test_log_stats.py @@ -17,6 +17,14 @@ def test_comm_log_summaries_with_empty_data(): assert isinstance(stats_pdframe, pd.DataFrame) +def test_comm_log_summaries_with_empty_data_hourly(): + text_data = pd.DataFrame.from_dict({}) + call_data = pd.DataFrame.from_dict({}) + stats_pdframe = comm_logs_summaries(text_data, call_data, STAMP_START, + STAMP_END, TZ_STR, Frequency.HOURLY) + assert isinstance(stats_pdframe, pd.DataFrame) + + def test_comm_log_summaries_with_empty_text_data(): text_data = pd.DataFrame.from_dict({}) call_data = pd.DataFrame.from_dict(