From fab315572982a666dfaa39c282a2a0b552874c9a Mon Sep 17 00:00:00 2001 From: GeorgiosEfstathiadis <54844705+GeorgeEfstathiadis@users.noreply.github.com> Date: Thu, 3 Aug 2023 12:22:01 -0400 Subject: [PATCH] Willow clean log stats (#192) --- .flake8 | 3 - forest/willow/log_stats.py | 487 +++++++++++++++++++++++-------------- 2 files changed, 308 insertions(+), 182 deletions(-) delete mode 100644 .flake8 diff --git a/.flake8 b/.flake8 deleted file mode 100644 index 188720eb..00000000 --- a/.flake8 +++ /dev/null @@ -1,3 +0,0 @@ -[flake8] -exclude = - forest/willow/log_stats.py diff --git a/forest/willow/log_stats.py b/forest/willow/log_stats.py index c3103d43..8427b784 100644 --- a/forest/willow/log_stats.py +++ b/forest/willow/log_stats.py @@ -1,25 +1,250 @@ +"""This module contains functions for calculating summary statistics for the +communication logs. +""" +import logging import os -import sys +from typing import List, Optional import pandas as pd import numpy as np from forest.constants import Frequency from forest.poplar.legacy.common_funcs import ( - read_data, write_all_summaries, datetime2stamp, stamp2datetime + read_data, + write_all_summaries, + datetime2stamp, + stamp2datetime, ) -def comm_logs_summaries( - df_text, df_call, stamp_start, stamp_end, tz_str, frequency -): +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger() + + +def text_analysis( + df_text: pd.DataFrame, stamp: int, step_size: int, frequency: Frequency +) -> tuple: + """Calculate the summary statistics for the text data + in the given time interval. + + Args: + df_text: pd.DataFrame + dataframe of the text data + stamp: int + starting timestamp of the study + step_size: int + ending timestamp of the study + frequency: Frequency class, + determining resolution of the summary stats + + Returns: + tuple of summary statistics containing: + num_s: int + number of sent SMS + num_r: int + number of received SMS + num_mms_s: int + number of sent MMS + num_mms_r: int + number of received MMS + num_s_tel: int + number of unique phone numbers in sent SMS + num_r_tel: int + number of unique phone numbers in received SMS + total_char_s: int + total number of characters in sent SMS + total_char_r: int + total number of characters in received SMS + text_reciprocity_incoming: int + number of received SMS without response + text_reciprocity_outgoing: int + number of sent SMS without response """ - Docstring + # filter the data based on the timestamp + temp_text = df_text[ + (df_text["timestamp"] / 1000 >= stamp) + & (df_text["timestamp"] / 1000 < stamp + step_size) + ] + + # calculate the number of texts + message_lengths = np.array(temp_text["message length"]) + for k, mlength in enumerate(message_lengths): + if mlength == "MMS": + message_lengths[k] = 0 + if not isinstance(mlength, str): + if np.isnan(mlength): + message_lengths[k] = 0 + + message_lengths = message_lengths.astype(int) + + index_s = np.array(temp_text["sent vs received"]) == "sent SMS" + index_r = np.array(temp_text["sent vs received"]) == "received SMS" + + send_to_number = np.unique( + np.array(temp_text["hashed phone number"])[index_s] + ) + receive_from_number = np.unique( + np.array(temp_text["hashed phone number"])[index_r] + ) + + num_s_tel = len(send_to_number) + num_r_tel = len(receive_from_number) + + index_mms_s = np.array(temp_text["sent vs received"]) == "sent MMS" + index_mms_r = np.array(temp_text["sent vs received"]) == "received MMS" + + num_s = sum(index_s.astype(int)) + num_r = sum(index_r.astype(int)) + num_mms_s = sum(index_mms_s.astype(int)) + num_mms_r = sum(index_mms_r.astype(int)) + total_char_s = sum(message_lengths[index_s]) + total_char_r = sum(message_lengths[index_r]) + + text_reciprocity_incoming = None + text_reciprocity_outgoing = None + + if frequency == Frequency.DAILY: + # find the phone number in sent_from, but not in send_to + received_no_response = [ + tel for tel in receive_from_number if tel not in send_to_number + ] + sent_no_response = [ + tel for tel in send_to_number if tel not in receive_from_number + ] + + text_reciprocity_incoming = 0 + for tel in received_no_response: + text_reciprocity_incoming += sum( + index_r + * (np.array(temp_text["hashed phone number"]) == tel) + ) + + text_reciprocity_outgoing = 0 + for tel in sent_no_response: + text_reciprocity_outgoing += sum( + index_s + * (np.array(temp_text["hashed phone number"]) == tel) + ) + + return ( + num_s, + num_r, + num_mms_s, + num_mms_r, + num_s_tel, + num_r_tel, + total_char_s, + total_char_r, + text_reciprocity_incoming, + text_reciprocity_outgoing, + ) + + +def call_analysis(df_call: pd.DataFrame, stamp: int, step_size: int) -> tuple: + """Calculate the summary statistics for the call data + in the given time interval. + Args: - frequency: Frequency class, determining resolution of the summary stats - tz_str: timezone where the study was/is conducted - The other inputs are the outputs from read_comm_logs(). - Return: pandas dataframe of summary stats + df_call: pd.DataFrame + dataframe of the call data + stamp: int + starting timestamp of the study + step_size: int + ending timestamp of the study + + Returns: + tuple of summary statistics containing: + num_in_call: int + number of incoming calls + num_out_call: int + number of outgoing calls + num_mis_call: int + number of missed calls + num_uniq_in_call: int + number of unique phone numbers in incoming calls + num_uniq_out_call: int + number of unique phone numbers in outgoing calls + num_uniq_mis_call: int + number of unique phone numbers in missed calls + total_time_in_call: int + total time in minutes of incoming calls + total_time_out_call: int + total time in minutes of outgoing calls + """ + # filter the data based on the timestamp + temp_call = df_call[ + (df_call["timestamp"] / 1000 >= stamp) + & (df_call["timestamp"] / 1000 < stamp + step_size) + ] + + dur_in_sec = np.array(temp_call["duration in seconds"]) + dur_in_sec[np.isnan(dur_in_sec)] = 0 + dur_in_min = dur_in_sec / 60 + + index_in_call = np.array(temp_call["call type"]) == "Incoming Call" + index_out_call = np.array(temp_call["call type"]) == "Outgoing Call" + index_mis_call = np.array(temp_call["call type"]) == "Missed Call" + + num_in_call = sum(index_in_call) + num_out_call = sum(index_out_call) + num_mis_call = sum(index_mis_call) + + num_uniq_in_call = len( + np.unique( + np.array(temp_call["hashed phone number"])[index_in_call] + ) + ) + num_uniq_out_call = len( + np.unique( + np.array(temp_call["hashed phone number"])[index_out_call] + ) + ) + num_uniq_mis_call = len( + np.unique( + np.array(temp_call["hashed phone number"])[index_mis_call] + ) + ) + + total_time_in_call = sum(dur_in_min[index_in_call]) + total_time_out_call = sum(dur_in_min[index_out_call]) + + return ( + num_in_call, + num_out_call, + num_mis_call, + num_uniq_in_call, + num_uniq_out_call, + num_uniq_mis_call, + total_time_in_call, + total_time_out_call, + ) + + +def comm_logs_summaries( + df_text: pd.DataFrame, df_call: pd.DataFrame, stamp_start: int, + stamp_end: int, tz_str: str, frequency: Frequency +) -> pd.DataFrame: + """Calculate the summary statistics for the communication logs. + + Args: + df_text: pd.DataFrame + dataframe of the text data + df_call: pd.DataFrame + dataframe of the call data + stamp_start: int + starting timestamp of the study + stamp_end: int + ending timestamp of the study + tz_str: str + timezone where the study was/is conducted + frequency: Frequency class, + determining resolution of the summary stats + + Returns: + pandas dataframe of summary stats + + Raises: + ValueError: if frequency is not of correct type """ summary_stats = [] start_year, start_month, start_day, start_hour, _, _ = stamp2datetime( @@ -48,148 +273,35 @@ def comm_logs_summaries( [end_year, end_month, end_day, end_hour, 0, 0], tz_str ) + # determine the step size based on the frequency + # step_size is in seconds step_size = 3600 * frequency.value # for each chunk, calculate the summary statistics (colmean or count) for stamp in np.arange(table_start, table_end + 1, step=step_size): - year, month, day, hour, minute, second = stamp2datetime(stamp, tz_str) - ( - num_in_call, - num_out_call, - num_mis_call, - num_uniq_in_call, - num_uniq_out_call, - num_uniq_mis_call, - total_time_in_call, - total_time_out_call, - num_s, - num_r, - num_mms_s, - num_mms_r, - num_s_tel, - num_r_tel, - total_char_s, - total_char_r, - text_reciprocity_incoming, - text_reciprocity_outgoing, - ) = [pd.NA] * 18 + year, month, day, hour, _, _ = stamp2datetime(stamp, tz_str) + # initialize the summary statistics + newline = [] + + if df_call.shape[0] > 0: + call_stats = call_analysis(df_call, stamp, step_size) + newline += list(call_stats) + else: + newline += [pd.NA] * 8 if df_text.shape[0] > 0: - temp_text = df_text[ - (df_text["timestamp"] / 1000 >= stamp) - & (df_text["timestamp"] / 1000 < stamp + step_size) - ] - m_len = np.array(temp_text["message length"]) - for k in range(len(m_len)): - if m_len[k] == "MMS": - m_len[k] = 0 - if isinstance(m_len[k], str) is False: - if np.isnan(m_len[k]): - m_len[k] = 0 - m_len = m_len.astype(int) - - index_s = np.array(temp_text["sent vs received"]) == "sent SMS" - index_r = np.array(temp_text["sent vs received"]) == "received SMS" - send_to_number = np.unique( - np.array(temp_text["hashed phone number"])[index_s] - ) - receive_from_number = np.unique( - np.array(temp_text["hashed phone number"])[index_r] - ) - num_s_tel = len(send_to_number) - num_r_tel = len(receive_from_number) - index_mms_s = np.array(temp_text["sent vs received"]) == "sent MMS" - index_mms_r = ( - np.array(temp_text["sent vs received"]) == "received MMS" - ) - num_s = sum(index_s.astype(int)) - num_r = sum(index_r.astype(int)) - num_mms_s = sum(index_mms_s.astype(int)) - num_mms_r = sum(index_mms_r.astype(int)) - total_char_s = sum(m_len[index_s]) - total_char_r = sum(m_len[index_r]) - if frequency == Frequency.DAILY: - received_no_response = [] - sent_no_response = [] - # find the phone number in sent_from, but not in send_to - for tel in receive_from_number: - if tel not in send_to_number: - received_no_response.append(tel) - for tel in send_to_number: - if tel not in receive_from_number: - sent_no_response.append(tel) - text_reciprocity_incoming = 0 - text_reciprocity_outgoing = 0 - for tel in received_no_response: - text_reciprocity_incoming += sum( - index_r * - (np.array(temp_text["hashed phone number"]) == tel) - ) - for tel in sent_no_response: - text_reciprocity_outgoing += sum( - index_s * - (np.array(temp_text["hashed phone number"]) == tel) - ) + text_stats = text_analysis(df_text, stamp, step_size, frequency) + newline += list(text_stats) + else: + newline += [pd.NA] * 10 - if df_call.shape[0] > 0: - temp_call = df_call[ - (df_call["timestamp"] / 1000 >= stamp) - & (df_call["timestamp"] / 1000 < stamp + step_size) - ] - dur_in_sec = np.array(temp_call["duration in seconds"]) - dur_in_sec[np.isnan(dur_in_sec)] = 0 - dur_in_min = dur_in_sec / 60 - index_in_call = np.array(temp_call["call type"]) == "Incoming Call" - index_out_call = ( - np.array(temp_call["call type"]) == "Outgoing Call" - ) - index_mis_call = np.array(temp_call["call type"]) == "Missed Call" - num_in_call = sum(index_in_call) - num_out_call = sum(index_out_call) - num_mis_call = sum(index_mis_call) - num_uniq_in_call = len( - np.unique( - np.array(temp_call["hashed phone number"])[index_in_call] - ) - ) - num_uniq_out_call = len( - np.unique( - np.array(temp_call["hashed phone number"])[index_out_call] - ) - ) - num_uniq_mis_call = len( - np.unique( - np.array(temp_call["hashed phone number"])[index_mis_call] - ) - ) - total_time_in_call = sum(dur_in_min[index_in_call]) - total_time_out_call = sum(dur_in_min[index_out_call]) - newline = [ - num_in_call, - num_out_call, - num_mis_call, - num_uniq_in_call, - num_uniq_out_call, - num_uniq_mis_call, - total_time_in_call, - total_time_out_call, - num_s, - num_r, - num_mms_s, - num_mms_r, - num_s_tel, - num_r_tel, - total_char_s, - total_char_r, - ] if frequency == Frequency.DAILY: - newline = [year, month, day] + newline + [ - text_reciprocity_incoming, - text_reciprocity_outgoing, - ] + newline = [year, month, day] + newline else: - newline = [year, month, day, hour] + newline + newline = [year, month, day, hour] + newline[:16] + summary_stats.append(newline) + columns = [ "num_in_call", "num_out_call", @@ -211,17 +323,19 @@ def comm_logs_summaries( if frequency == Frequency.DAILY: return pd.DataFrame( summary_stats, - columns=["year", "month", "day"] + columns + [ + columns=["year", "month", "day"] + + columns + + [ "text_reciprocity_incoming", "text_reciprocity_outgoing", - ], - ) - else: - return pd.DataFrame( - summary_stats, - columns=["year", "month", "day", "hour"] + columns, + ], ) + return pd.DataFrame( + summary_stats, + columns=["year", "month", "day", "hour"] + columns, + ) + # Main function/wrapper should take standard arguments with Beiwe names: def log_stats_main( @@ -229,42 +343,54 @@ def log_stats_main( output_folder: str, tz_str: str, frequency: Frequency, - time_start=None, - time_end=None, - beiwe_id=None, -): - if os.path.exists(output_folder) is False: - os.mkdir(output_folder) + time_start: Optional[List] = None, + time_end: Optional[List] = None, + beiwe_id: Optional[List[str]] = None, +) -> None: + """Main function for calculating the summary statistics for the + communication logs. + + Args: + study_folder: path to the study folder + output_folder: path to the output folder + tz_str: timezone where the study was/is conducted + frequency: Frequency class, + determining resolution of the summary stats + time_start: starting timestamp of the study + time_end: ending timestamp of the study + beiwe_id: list of Beiwe IDs to be processed + """ + os.makedirs(output_folder, exist_ok=True) + if frequency == Frequency.HOURLY_AND_DAILY: - if os.path.exists(output_folder + "/hourly") is False: - os.mkdir(output_folder + "/hourly") - if os.path.exists(output_folder + "/daily") is False: - os.mkdir(output_folder + "/daily") + os.makedirs(output_folder + "/hourly", exist_ok=True) + os.makedirs(output_folder + "/daily", exist_ok=True) + # beiwe_id should be a list of str if beiwe_id is None: - beiwe_id = os.listdir(study_folder) - id_w_folder = [] - for i in beiwe_id: - if os.path.isdir(study_folder + "/" + i): - id_w_folder.append(i) - beiwe_id = id_w_folder + beiwe_id = [ + i for i in os.listdir(study_folder) + if os.path.isdir(f"{study_folder}/{i}") + ] if len(beiwe_id) > 0: - for ID in beiwe_id: - sys.stdout.write("User: " + ID + "\n") + for bid in beiwe_id: + logger.info("User: %s", bid) try: # read data text_data, text_stamp_start, text_stamp_end = read_data( - ID, study_folder, "texts", tz_str, time_start, time_end + bid, study_folder, "texts", tz_str, time_start, time_end ) call_data, call_stamp_start, call_stamp_end = read_data( - ID, study_folder, "calls", tz_str, time_start, time_end + bid, study_folder, "calls", tz_str, time_start, time_end ) + if text_data.shape[0] > 0 or call_data.shape[0] > 0: # stamps from call and text should be the stamp_end - sys.stdout.write("Data imported ..." + "\n") + logger.info("Data imported ...") stamp_start = min(text_stamp_start, call_stamp_start) stamp_end = max(text_stamp_end, call_stamp_end) + # process data if frequency == Frequency.HOURLY_AND_DAILY: stats_pdframe1 = comm_logs_summaries( @@ -283,11 +409,12 @@ def log_stats_main( tz_str, Frequency.DAILY, ) + write_all_summaries( - ID, stats_pdframe1, output_folder + "/hourly" + bid, stats_pdframe1, output_folder + "/hourly" ) write_all_summaries( - ID, stats_pdframe2, output_folder + "/daily" + bid, stats_pdframe2, output_folder + "/daily" ) else: stats_pdframe = comm_logs_summaries( @@ -298,12 +425,14 @@ def log_stats_main( tz_str, frequency, ) - write_all_summaries(ID, stats_pdframe, output_folder) - sys.stdout.write( - "Summary statistics obtained. Finished.\n" + + write_all_summaries(bid, stats_pdframe, output_folder) + + logger.info( + "Summary statistics obtained. Finished." ) - except: - sys.stdout.write( - "An error occured when processing the data.\n" + + except Exception as err: + logger.error( + "An error occurred when processing the data: %s", err ) - pass