diff --git a/.gitignore b/.gitignore index f5a3955b..904de1e2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,9 @@ __pycache__/ .DS_Store -# IntelliJ project files +# IntelliJ, VsCode project files .idea +.vscode # for installing Forest in editable mode when developing /forest.egg-info/ @@ -18,3 +19,6 @@ __pycache__/ #sphinx build docs/_build/ + +# any python environment files +.python-version \ No newline at end of file diff --git a/forest/jasmine/traj2stats.py b/forest/jasmine/traj2stats.py index e25f653c..a67388af 100644 --- a/forest/jasmine/traj2stats.py +++ b/forest/jasmine/traj2stats.py @@ -1561,7 +1561,8 @@ def gps_stats_main( Args: study_folder: str, the path of the study folder output_folder: str, the path of the folder - where you want to save results + where you want to save results. A folder named jasmine + will be created containing all output. tz_str: str, timezone frequency: Frequency, the frequency of the summary stats (resolution for summary statistics) @@ -1598,16 +1599,30 @@ def gps_stats_main( Raises: ValueError: Frequency is not valid """ - # no minutely analysis on GPS data if frequency == Frequency.MINUTE: raise ValueError("Frequency cannot be minutely.") - os.makedirs(output_folder, exist_ok=True) - if parameters is None: parameters = Hyperparameters() + if frequency == Frequency.HOURLY_AND_DAILY: + frequencies = [Frequency.HOURLY, Frequency.DAILY] + else: + frequencies = [frequency] + + # Ensure that the correct output folder structures exist, centralize folder + # names. Note that frequencies + trajectory_folder = f"{output_folder}/trajectory" + logs_folder = f"{output_folder}/logs" + os.makedirs(output_folder, exist_ok=True) + os.makedirs(logs_folder, exist_ok=True) + for freq in frequencies: + os.makedirs(f"{output_folder}/{freq.name.lower()}", exist_ok=True) + if save_traj: + os.makedirs(trajectory_folder, exist_ok=True) + + # pars0 is passed to bv_select, pars1 to impute_gps pars0 = [ parameters.l1, parameters.l2, parameters.l3, parameters.a1, parameters.a2, parameters.b1, parameters.b2, parameters.b3 @@ -1620,24 +1635,20 @@ def gps_stats_main( # participant_ids should be a list of str if participant_ids is None: participant_ids = get_ids(study_folder) - # create a record of processed user participant_id and starting/ending time + # Create a record of processed participant_id and starting/ending time. + # These are updated and saved to disk after each participant is processed. + all_memory_dict_file = f"{output_folder}/all_memory_dict.pkl" + all_bv_set_file = f"{output_folder}/all_bv_set.pkl" if all_memory_dict is None: all_memory_dict = {} for participant_id in participant_ids: all_memory_dict[str(participant_id)] = None - if all_bv_set is None: all_bv_set = {} for participant_id in participant_ids: all_bv_set[str(participant_id)] = None - if frequency == Frequency.HOURLY_AND_DAILY: - os.makedirs(f"{output_folder}/hourly", exist_ok=True) - os.makedirs(f"{output_folder}/daily", exist_ok=True) - if save_traj: - os.makedirs(f"{output_folder}/trajectory", exist_ok=True) - for participant_id in participant_ids: logger.info("User: %s", participant_id) # data quality check @@ -1664,6 +1675,7 @@ def gps_stats_main( params_w = np.mean(data.accuracy) else: params_w = parameters.w + # process data mobmat1 = gps_to_mobmat( data, parameters.itrvl, parameters.accuracylim, @@ -1681,6 +1693,8 @@ def gps_stats_main( ) all_bv_set[str(participant_id)] = bv_set = out_dict["BV_set"] all_memory_dict[str(participant_id)] = out_dict["memory_dict"] + + # impute_gps can fail, if so we skip this participant. try: imp_table = impute_gps( mobmat2, bv_set, parameters.method, @@ -1690,6 +1704,7 @@ def gps_stats_main( except RuntimeError as e: logger.error("Error: %s", e) continue + traj = imp_to_traj(imp_table, mobmat2, params_w) # raise error if traj coordinates are not in the range of # [-90, 90] and [-180, 180] @@ -1709,72 +1724,64 @@ def gps_stats_main( "[-90, 90] and [-180, 180]." ) # save all_memory_dict and all_bv_set - with open(f"{output_folder}/all_memory_dict.pkl", "wb") as f: + with open(all_memory_dict_file, "wb") as f: pickle.dump(all_memory_dict, f) - with open(f"{output_folder}/all_bv_set.pkl", "wb") as f: + with open(all_bv_set_file, "wb") as f: pickle.dump(all_bv_set, f) if save_traj is True: pd_traj = pd.DataFrame(traj) pd_traj.columns = ["status", "x0", "y0", "t0", "x1", "y1", "t1", "obs"] pd_traj.to_csv( - f"{output_folder}/trajectory/{participant_id}.csv", + f"{trajectory_folder}/{participant_id}.csv", index=False ) - if frequency == Frequency.HOURLY_AND_DAILY: - summary_stats1, logs1 = gps_summaries( - traj, - tz_str, - Frequency.HOURLY, - parameters, - places_of_interest, - osm_tags, - ) - write_all_summaries(participant_id, summary_stats1, - f"{output_folder}/hourly") - summary_stats2, logs2 = gps_summaries( - traj, - tz_str, - Frequency.DAILY, - parameters, - places_of_interest, - osm_tags, - ) - write_all_summaries(participant_id, summary_stats2, - f"{output_folder}/daily") - if parameters.save_osm_log: - os.makedirs(f"{output_folder}/logs", exist_ok=True) - with open( - f"{output_folder}/logs/locations_logs_hourly.json", - "w", - ) as hourly: - json.dump(logs1, hourly, indent=4) - with open( - f"{output_folder}/logs/locations_logs_daily.json", - "w", - ) as daily: - json.dump(logs2, daily, indent=4) - else: - summary_stats, logs = gps_summaries( - traj, - tz_str, - frequency, - parameters, - places_of_interest, - osm_tags, - ) - write_all_summaries( - participant_id, summary_stats, output_folder + + # generate summary stats. + # (variable "frequency" is already declared in signature) + for freq in frequencies: + gps_stats_generate_summary( + traj=traj, + tz_str=tz_str, + frequency=freq, + participant_id=participant_id, + output_folder=f"{output_folder}/{freq.name.lower()}", + logs_folder=logs_folder, + parameters=parameters, + places_of_interest=places_of_interest, + osm_tags=osm_tags, ) - if parameters.save_osm_log: - os.makedirs(f"{output_folder}/logs", exist_ok=True) - with open( - f"{output_folder}/logs/locations_logs.json", - "w", - ) as loc: - json.dump(logs, loc, indent=4) else: logger.info( "GPS data are not collected" " or the data quality is too low" ) + + +def gps_stats_generate_summary( + traj: np.ndarray, + tz_str: str, + frequency: Frequency, + participant_id: str, + output_folder: str, + logs_folder: str, + parameters: Hyperparameters, + places_of_interest: Optional[list] = None, + osm_tags: Optional[List[OSMTags]] = None): + """This is simply the inner functionality of gps_stats_main. + Runs summaries code, writes to disk, saves logs if required. """ + summary_stats, logs = gps_summaries( + traj, + tz_str, + frequency, + parameters, + places_of_interest, + osm_tags, + ) + write_all_summaries(participant_id, summary_stats, output_folder) + if parameters.save_osm_log: + with open( + f"{logs_folder}/locations_logs_{frequency.name.lower()}.json", + "wa", + ) as loc: + json.dump(logs, loc, indent=4) diff --git a/forest/oak/base.py b/forest/oak/base.py index 3fccb03e..f21ef82a 100644 --- a/forest/oak/base.py +++ b/forest/oak/base.py @@ -691,7 +691,7 @@ def run(study_folder: str, output_folder: str, tz_str: Optional[str] = None, 'walking_time': walkingtime_daily[:, -1], 'steps': steps_daily[:, -1], 'cadence': cadence_daily[:, -1]}) - output_file = user + "_gait_daily.csv" + output_file = user + ".csv" dest_path = os.path.join(output_folder, "daily", output_file) summary_stats.to_csv(dest_path, index=False) if frequency != Frequency.DAILY: diff --git a/forest/sycamore/base.py b/forest/sycamore/base.py index 67da6024..86649926 100644 --- a/forest/sycamore/base.py +++ b/forest/sycamore/base.py @@ -224,12 +224,16 @@ def compute_survey_stats( def get_submits_for_tableau( - study_folder: str, output_folder: str, config_path: str, - tz_str: str = "UTC", start_date: str = EARLIEST_DATE, - end_date: Optional[str] = None, users: Optional[List] = None, - interventions_filepath: Optional[str] = None, - submits_timeframe: Frequency = Frequency.DAILY, - history_path: Optional[str] = None + study_folder: str, + output_folder: str, + config_path: str, + tz_str: str = "UTC", + start_date: str = EARLIEST_DATE, + end_date: Optional[str] = None, + users: Optional[List] = None, + interventions_filepath: Optional[str] = None, + submits_timeframe: Frequency = Frequency.DAILY, + history_path: Optional[str] = None ) -> None: """Get survey submissions per day for integration into Tableau WDC @@ -247,8 +251,7 @@ def get_submits_for_tableau( end_date: The latest survey data to read in, in YYYY-MM-DD format users: - List of users in study for which we - are generating a survey schedule + List of users in study that we are generating a survey schedule for interventions_filepath: filepath where interventions json file is. submits_timeframe: @@ -257,65 +260,49 @@ def get_submits_for_tableau( history_path: Filepath to the survey history file. If this is not included, audio survey timings cannot be estimated. """ - if submits_timeframe not in [ Frequency.HOURLY, Frequency.DAILY, Frequency.HOURLY_AND_DAILY ]: logger.error("Error: Invalid submits timeframe") return + if submits_timeframe == Frequency.HOURLY_AND_DAILY: + submits_timeframes = [Frequency.HOURLY, Frequency.DAILY] + else: + submits_timeframes = [submits_timeframe] + os.makedirs(output_folder, exist_ok=True) + for freq in submits_timeframes: + os.makedirs(f"{output_folder}/{freq.name.lower()}", exist_ok=True) if users is None: users = get_ids(study_folder) - if end_date is None: end_date = get_month_from_today() # Read, aggregate and clean data - else: - agg_data = aggregate_surveys_config( - study_folder, config_path, tz_str, users, start_date, - end_date, augment_with_answers=True, include_audio_surveys=True - ) - - if agg_data.shape[0] == 0: - logger.error("Error: No survey data found in %s", study_folder) - return - - # Create survey submits detail and summary - ss_detail = survey_submits( - config_path, start_date, end_date, - users, agg_data, interventions_filepath, history_path - ) - - if ss_detail.shape[0] == 0: - logger.error("Error: no submission data found") - return + agg_data = aggregate_surveys_config( + study_folder, config_path, tz_str, users, start_date, + end_date, augment_with_answers=True, include_audio_surveys=True + ) - if submits_timeframe == Frequency.HOURLY_AND_DAILY: - ss_summary_h = summarize_submits( - ss_detail, Frequency.HOURLY, False - ) - ss_summary_d = summarize_submits( - ss_detail, Frequency.DAILY, False - ) + if agg_data.shape[0] == 0: + logger.error("Error: No survey data found in %s", study_folder) + return - write_data_by_user(ss_summary_d, - os.path.join(output_folder, "both", "daily"), - users) - write_data_by_user(ss_summary_h, - os.path.join(output_folder, "both", "hourly"), - users) + # Create survey submits detail and summary + ss_detail = survey_submits( + config_path, start_date, end_date, + users, agg_data, interventions_filepath, history_path + ) - elif submits_timeframe == Frequency.HOURLY: - ss_summary_h = summarize_submits( - ss_detail, Frequency.HOURLY, False - ) - write_data_by_user(ss_summary_h, output_folder, users) + if ss_detail.shape[0] == 0: + logger.error("Error: no submission data found") + return - elif submits_timeframe == Frequency.DAILY: - ss_summary_d = summarize_submits( - ss_detail, Frequency.DAILY, False - ) - write_data_by_user(ss_summary_d, output_folder, users) + # run once for every submits_timeframe, per-user is handled internally + for freq in submits_timeframes: + ss_summary = summarize_submits(ss_detail, freq, False) + write_data_by_user( + ss_summary, f"{output_folder}/{freq.name.lower()}", users + ) diff --git a/forest/sycamore/utils.py b/forest/sycamore/utils.py index a4f365cd..9b69b70f 100644 --- a/forest/sycamore/utils.py +++ b/forest/sycamore/utils.py @@ -31,8 +31,7 @@ def get_month_from_today(): datetime.timedelta(31)).strftime("%Y-%m-%d") -def filename_to_timestamp(filename: str, tz_str: str = "UTC" - ) -> pd.Timestamp: +def filename_to_timestamp(filename: str, tz_str: str = "UTC") -> pd.Timestamp: """Extract a datetime from a filepath. Args: diff --git a/forest/willow/log_stats.py b/forest/willow/log_stats.py index 3eee8a25..b797e981 100644 --- a/forest/willow/log_stats.py +++ b/forest/willow/log_stats.py @@ -413,7 +413,7 @@ def log_stats_main( frequency: Frequency, time_start: Optional[List] = None, time_end: Optional[List] = None, - beiwe_id: Optional[List[str]] = None, + beiwe_ids: Optional[List[str]] = None, ) -> None: """Main function for calculating the summary statistics for the communication logs. @@ -426,7 +426,7 @@ def log_stats_main( determining resolution of the summary stats time_start: starting timestamp of the study time_end: ending timestamp of the study - beiwe_id: list of Beiwe IDs to be processed + beiwe_ids: list of Beiwe IDs to be processed """ if frequency not in [ @@ -437,121 +437,104 @@ def log_stats_main( "HOURLY_AND_DAILY, DAILY, HOURLY" ) - os.makedirs(output_folder, exist_ok=True) - if frequency == Frequency.HOURLY_AND_DAILY: - os.makedirs(output_folder + "/hourly", exist_ok=True) - os.makedirs(output_folder + "/daily", exist_ok=True) + frequencies = [Frequency.HOURLY, Frequency.DAILY] + else: + frequencies = [frequency] + + os.makedirs(output_folder, exist_ok=True) + for freq in frequencies: + os.makedirs(f"{output_folder}/{freq.name.lower()}", exist_ok=True) # beiwe_id should be a list of str - if beiwe_id is None: - beiwe_id = [ - i for i in os.listdir(study_folder) - if os.path.isdir(f"{study_folder}/{i}") + if beiwe_ids is None: + beiwe_ids = [ + participant_id for participant_id in os.listdir(study_folder) + if os.path.isdir(f"{study_folder}/{participant_id}") ] - if len(beiwe_id) > 0: - for bid in beiwe_id: - logger.info("User: %s", bid) + # process the data for each participant in each frequency into a folder of + # the corresponding frequency. + for beiwe_id in beiwe_ids: + for freq in frequencies: + logger.info("(%s) Participant: %s", freq.name.lower(), beiwe_id) try: - # read data - text_data, text_stamp_start, text_stamp_end = read_data( - bid, study_folder, "texts", tz_str, time_start, time_end - ) - call_data, call_stamp_start, call_stamp_end = read_data( - bid, study_folder, "calls", tz_str, time_start, time_end + log_stats_inner( + beiwe_id, + f"{output_folder}/{freq.name.lower()}", + study_folder, + frequency, + tz_str, + time_start, + time_end ) + except Exception as err: + logger.error("An error occurred when processing data: %s", err) - if text_data.shape[0] > 0 or call_data.shape[0] > 0: - # stamps from call and text should be the stamp_end - logger.info("Data imported ...") - stamp_start = min(text_stamp_start, call_stamp_start) - stamp_end = max(text_stamp_end, call_stamp_end) - - # process data - if frequency == Frequency.HOURLY_AND_DAILY: - stats_pdframe1 = comm_logs_summaries( - text_data, - call_data, - stamp_start, - stamp_end, - tz_str, - Frequency.HOURLY, - ) - stats_pdframe2 = comm_logs_summaries( - text_data, - call_data, - stamp_start, - stamp_end, - tz_str, - Frequency.DAILY, - ) - - write_all_summaries( - bid, stats_pdframe1, output_folder + "/hourly" - ) - write_all_summaries( - bid, stats_pdframe2, output_folder + "/daily" - ) - else: - stats_pdframe = comm_logs_summaries( - text_data, - call_data, - stamp_start, - stamp_end, - tz_str, - frequency, - ) - # num_uniq_individuals_call_or_text is the cardinality - # of the union of several sets. It should should always - # be at least as large as the cardinality of any one of - # the sets, and it should never be larger than the sum - # of the cardinalities of all of the sets - # (it may be equal if all the sets are disjoint) - sum_all_set_cols = pd.Series( - [0]*stats_pdframe.shape[0] - ) - for col in [ - "num_s_tel", "num_r_tel", "num_in_caller", - "num_out_caller", "num_mis_caller" - ]: - sum_all_set_cols += stats_pdframe[col] - if ( - stats_pdframe[ - "num_uniq_individuals_call_or_text" - ] < stats_pdframe[col] - ).any(): - logger.error( - "Error: " - "num_uniq_individuals_call_or_text " - "was found to be less than %s for at " - "least one time interval. This error " - "comes from an issue with the code," - " not an issue with the input data", - col - ) - if ( - stats_pdframe[ - "num_uniq_individuals_call_or_text" - ] > sum_all_set_cols - ).any(): - logger.error( - "Error: " - "num_uniq_individuals_call_or_text " - "was found to be larger than the sum " - "of individual cardinalities for at " - "least one time interval. This error " - "comes from an issue with the code," - " not an issue with the input data" - ) - - write_all_summaries(bid, stats_pdframe, output_folder) - - logger.info( - "Summary statistics obtained. Finished." - ) + logger.info("Summary statistics obtained. Finished.") - except Exception as err: - logger.error( - "An error occurred when processing the data: %s", err - ) + +def log_stats_inner( + beiwe_id: str, + output_folder: str, + study_folder: str, + frequency: Frequency, + tz_str: str, + time_start: Optional[List] = None, + time_end: Optional[List] = None, +): + """ Inner functionality of log_stats_main """ + # read data + text_data, text_stamp_start, text_stamp_end = read_data( + beiwe_id, study_folder, "texts", tz_str, time_start, time_end + ) + call_data, call_stamp_start, call_stamp_end = read_data( + beiwe_id, study_folder, "calls", tz_str, time_start, time_end + ) + + # give up early if there is no data + if text_data.shape[0] <= 0 and call_data.shape[0] <= 0: + logger.info("There was no data for participant %s", beiwe_id) + return + + # stamps from call and text should be the stamp_end + logger.info("Data imported ...") + stamp_start = min(text_stamp_start, call_stamp_start) + stamp_end = max(text_stamp_end, call_stamp_end) + + # process the data + stats_pdframe = comm_logs_summaries( + text_data, call_data, stamp_start, stamp_end, tz_str, frequency + ) + + # num_uniq_individuals_call_or_text is the cardinality of the union of + # several sets. It should should always be at least as large as the + # cardinality of any one of the sets, and it should never be larger than + # the sum of the cardinalities of all of the sets. (it may be equal if all + # the sets are disjoint) + num_uniq_column = "num_uniq_individuals_call_or_text" # legibility hax. + sum_all_set_cols = pd.Series([0]*stats_pdframe.shape[0]) + for column in [ + "num_s_tel", "num_r_tel", "num_in_caller", + "num_out_caller", "num_mis_caller" + ]: + sum_all_set_cols += stats_pdframe[column] + if (stats_pdframe[num_uniq_column] < stats_pdframe[column]).any(): + logger.error( + "Error: " + "num_uniq_individuals_call_or_text was found to be less than " + "%s for at least one time interval. This error comes from an " + "issue with the code, not an issue with the input data.", + column + ) + + if (stats_pdframe[num_uniq_column] > sum_all_set_cols).any(): + logger.error( + "Error: " + "num_uniq_individuals_call_or_text was found to be larger than the" + "sum of individual cardinalities for at least one time interval. " + "This error comes from an issue with the code, not an issue with " + "the input data." + ) + + write_all_summaries(beiwe_id, stats_pdframe, output_folder)