From 3eb418801f9fac585552af04a388960e556d536b Mon Sep 17 00:00:00 2001 From: Vishal Gupta Date: Tue, 31 Dec 2024 09:12:24 +0000 Subject: [PATCH] Addressed reviewer comments --- tools/differ/README.md | 29 +++--- tools/differ/differ.py | 95 ++++++++++--------- tools/differ/differ_test.py | 16 ++-- tools/differ/helper.py | 32 ++++--- .../test/results/point-analysis-results.csv | 7 ++ .../test/results/point-analysis-summary.csv | 5 + .../test/results/series-analysis-results.csv | 6 ++ .../test/results/series-analysis-summary.csv | 5 + 8 files changed, 119 insertions(+), 76 deletions(-) create mode 100644 tools/differ/test/results/point-analysis-results.csv create mode 100644 tools/differ/test/results/point-analysis-summary.csv create mode 100644 tools/differ/test/results/series-analysis-results.csv create mode 100644 tools/differ/test/results/series-analysis-summary.csv diff --git a/tools/differ/README.md b/tools/differ/README.md index 16d9fc8cc4..80b9ee5b78 100644 --- a/tools/differ/README.md +++ b/tools/differ/README.md @@ -7,23 +7,26 @@ This utility generates a diff (point and series analysis) of two versions of the python differ.py --current_data= --previous_data= ``` -Parameter description -current_data: Path to the current MCF data (single mcf file or folder/* supported). -previous_data: Path to the previous MCF data (single mcf file or folder/* supported). -output_location: Path to the output data folder. -groupby_columns: Columns to group data for diff analysis in the order var,place,time etc. Default value: “variableMeasured,observationAbout,observationDate” -value_columns: Columns with statvar value (unit etc.) for diff analysis. Default value: "value,unit" +Parameter description: +- current\_data: Path to the current MCF data (single mcf file or folder/* on local/GCS supported). +- previous\_data: Path to the previous MCF data (single mcf file or folder/* on local/GCS supported). +- output\_location: Path to the output data folder. Default value: results. +- groupby\_columns: Columns to group data for diff analysis in the order var,place,time etc. Default value: “variableMeasured,observationAbout,observationDate,measureMethod,unit”. +- value\_columns: Columns with statvar value for diff analysis. Default value: "unit,scalingFactor". + +**Output** Summary output generated is of the form below showing counts of differences for each variable. -variableMeasured added deleted modified same total -0 dcid:var1 1 0 0 0 1 -1 dcid:var2 0 2 1 1 4 -2 dcid:var3 0 0 1 0 1 -3 dcid:var4 0 2 0 0 2 +| |variableMeasured|added|deleted|modified|same|total| +|---|---|---|---|---|---|---| +|0|dcid:var1|1|0|0|0|1| +|1|dcid:var2|0|2|1|1|4| +|2|dcid:var3|0|0|1|0|1| +|3|dcid:var4|0|2|0|0|2| -Detailed diff output is written to files for further analysis. +Detailed diff output is written to files for further analysis. Sample result files can be found under folder 'test/results'. - point-analysis-summary.csv: diff summry for point analysis - point-analysis-results.csv: detailed results for point analysis - series-analysis-summary.csv: diff summry for series analysis -- series-analysis-results.csv: detailed results for series analysis \ No newline at end of file +- series-analysis-results.csv: detailed results for series analysis diff --git a/tools/differ/differ.py b/tools/differ/differ.py index 882cf1719b..2e53b0882b 100644 --- a/tools/differ/differ.py +++ b/tools/differ/differ.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,20 +26,20 @@ FLAGS = flags.FLAGS flags.DEFINE_string( 'current_data', '', 'Path to the current MCF data \ - (single mcf file or folder/* supported).') + (single mcf file or folder/* on local/GCS supported).') flags.DEFINE_string( 'previous_data', '', 'Path to the previous MCF data \ - (single mcf file or folder/* supported).') + (single mcf file or folder/* on local/GCS supported).') flags.DEFINE_string('output_location', 'results', \ 'Path to the output data folder.') flags.DEFINE_string( - 'groupby_columns', 'variableMeasured,observationAbout,observationDate', + 'groupby_columns', + 'variableMeasured,observationAbout,observationDate,measurementMethod,unit', 'Columns to group data for diff analysis in the order (var,place,time etc.).' ) -flags.DEFINE_string( - 'value_columns', 'value,unit', - 'Columns with statvar value (unit etc.) for diff analysis.') +flags.DEFINE_string('value_columns', 'value,scalingFactor', + 'Columns with statvar value for diff analysis.') SAMPLE_COUNT = 3 @@ -69,13 +69,17 @@ class DatasetDiffer: """ - def __init__(self, groupby_columns, value_columns): + def __init__(self, current_data, previous_data, output_location, + groupby_columns, value_columns): + self.current_data = current_data + self.previous_data = previous_data + self.output_location = output_location self.groupby_columns = groupby_columns.split(',') self.value_columns = value_columns.split(',') self.variable_column = self.groupby_columns[0] self.place_column = self.groupby_columns[1] self.time_column = self.groupby_columns[2] - self.diff_column = '_diff_result' + self.diff_column = 'diff_result' def _cleanup_data(self, df: pd.DataFrame): for column in ['added', 'deleted', 'modified', 'same']: @@ -208,43 +212,48 @@ def series_analysis(self, axis=1) return summary, result + def run_differ(self): + if not os.path.exists(FLAGS.output_location): + os.makedirs(FLAGS.output_location) + logging.info('Loading data...') + current_df = helper.load_data(self.current_data, self.output_location) + previous_df = helper.load_data(self.previous_data, self.output_location) + + logging.info('Processing data...') + in_data = self.process_data(previous_df, current_df) + + logging.info('Point analysis:') + summary, result = self.point_analysis(in_data) + result.sort_values(by=[self.diff_column, self.variable_column], + inplace=True) + print(summary.head(10)) + print(result.head(10)) + helper.write_data(summary, self.output_location, + 'point-analysis-summary.csv') + helper.write_data(result, self.output_location, + 'point-analysis-results.csv') + + logging.info('Series analysis:') + summary, result = self.series_analysis(in_data) + result.sort_values(by=[self.diff_column, self.variable_column], + inplace=True) + print(summary.head(10)) + print(result.head(10)) + helper.write_data(summary, self.output_location, + 'series-analysis-summary.csv') + helper.write_data(result, self.output_location, + 'series-analysis-results.csv') + + logging.info('Differ output written to folder: %s', + self.output_location) + def main(_): '''Runs the differ.''' - differ = DatasetDiffer(FLAGS.groupby_columns, FLAGS.value_columns) - - if not os.path.exists(FLAGS.output_location): - os.makedirs(FLAGS.output_location) - logging.info('Loading data...') - current_df = helper.load_data(FLAGS.current_data, FLAGS.output_location) - previous_df = helper.load_data(FLAGS.previous_data, FLAGS.output_location) - - logging.info('Processing data...') - in_data = differ.process_data(previous_df, current_df) - - logging.info('Point analysis:') - summary, result = differ.point_analysis(in_data) - result.sort_values(by=[differ.diff_column, differ.variable_column], - inplace=True) - print(summary.head(10)) - print(result.head(10)) - helper.write_data(summary, FLAGS.output_location, - 'point-analysis-summary.csv') - helper.write_data(result, FLAGS.output_location, - 'point-analysis-results.csv') - - logging.info('Series analysis:') - summary, result = differ.series_analysis(in_data) - result.sort_values(by=[differ.diff_column, differ.variable_column], - inplace=True) - print(summary.head(10)) - print(result.head(10)) - helper.write_data(summary, FLAGS.output_location, - 'series-analysis-summary.csv') - helper.write_data(result, FLAGS.output_location, - 'series-analysis-results.csv') - - logging.info('Differ output written to folder: %s', FLAGS.output_location) + differ = DatasetDiffer(FLAGS.current_data, FLAGS.previous_data, + FLAGS.output_location, FLAGS.groupby_columns, + FLAGS.value_columns) + differ.run_differ() if __name__ == '__main__': diff --git a/tools/differ/differ_test.py b/tools/differ/differ_test.py index 905f7b36d6..6694b337b7 100644 --- a/tools/differ/differ_test.py +++ b/tools/differ/differ_test.py @@ -1,4 +1,4 @@ -# Copyright 2023 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -33,12 +33,14 @@ class TestDiffer(unittest.TestCase): def test_diff_analysis(self): groupby_columns = 'variableMeasured,observationAbout,observationDate' value_columns = 'value' - - differ = DatasetDiffer(groupby_columns, value_columns) - current = helper.load_mcf_file( - os.path.join(module_dir, 'test', 'current.mcf')) - previous = helper.load_mcf_file( - os.path.join(module_dir, 'test', 'previous.mcf')) + current_data = os.path.join(module_dir, 'test', 'current.mcf') + previous_data = os.path.join(module_dir, 'test', 'previous.mcf') + output_location = os.path.join(module_dir, 'test') + + differ = DatasetDiffer(current_data, previous_data, output_location, + groupby_columns, value_columns) + current = helper.load_mcf_file(current_data) + previous = helper.load_mcf_file(previous_data) in_data = differ.process_data(previous, current) summary, result = differ.point_analysis(in_data) diff --git a/tools/differ/helper.py b/tools/differ/helper.py index 82e03d5ffa..40a8bc6a00 100644 --- a/tools/differ/helper.py +++ b/tools/differ/helper.py @@ -3,6 +3,7 @@ import pandas as pd import re +from absl import logging from google.cloud.storage import Client @@ -14,7 +15,7 @@ def load_mcf_file(file: str) -> pd.DataFrame: # nodes separated by a blank line mcf_nodes_text = mcf_contents.split('\n\n') # lines seprated as property: constraint - mcf_line = re.compile(r'^(\w+): (.*)$') + mcf_line = re.compile(r'^(\w+)\s*:\s*(.*)$') mcf_nodes = [] for node in mcf_nodes_text: current_mcf_node = {} @@ -22,9 +23,12 @@ def load_mcf_file(file: str) -> pd.DataFrame: parsed_line = mcf_line.match(line) if parsed_line is not None: current_mcf_node[parsed_line.group(1)] = parsed_line.group(2) - if current_mcf_node and current_mcf_node[ - 'typeOf'] == 'dcid:StatVarObservation': - mcf_nodes.append(current_mcf_node) + if current_mcf_node: + if current_mcf_node['typeOf'] == 'dcid:StatVarObservation': + mcf_nodes.append(current_mcf_node) + else: + logging.warning( + f'Ignoring node of type:{current_mcf_node["typeOf"]}') df = pd.DataFrame(mcf_nodes) return df @@ -32,17 +36,17 @@ def load_mcf_file(file: str) -> pd.DataFrame: def load_mcf_files(path: str) -> pd.DataFrame: """ Loads all sharded mcf files in the given directory and returns a single combined dataframe.""" - df = pd.DataFrame() + df_list = [] filenames = glob.glob(path + '.mcf') for filename in filenames: - df2 = load_mcf_file(filename) - # Merge data frames, expects same headers - df = pd.concat([df, df2]) - return df + df = load_mcf_file(filename) + df_list.append(df) + result = pd.concat(df_list, ignore_index=True) + return result def write_data(df: pd.DataFrame, path: str, file: str): - """ Write a dataframe to a CSV file with the given path.""" + """ Writes a dataframe to a CSV file with the given path.""" out_file = open(os.path.join(path, file), mode='w', encoding='utf-8') df.to_csv(out_file, index=False, mode='w') out_file.close() @@ -52,6 +56,7 @@ def load_data(path: str, tmp_dir: str) -> pd.DataFrame: """ Loads data from the given path and returns as a dataframe. Args: path: local or gcs path (single file or folder/* format) + tmp_dir: destination folder Returns: dataframe with the input data """ @@ -68,6 +73,7 @@ def get_gcs_data(uri: str, tmp_dir: str) -> str: """ Downloads files form GCS and copies them to local. Args: uri: single file path or folder/* format + tmp_dir: destination folder Returns: path to the output file/folder """ @@ -77,12 +83,12 @@ def get_gcs_data(uri: str, tmp_dir: str) -> str: if uri.endswith('*'): blobs = client.list_blobs(bucket) for blob in blobs: - path = os.path.join(os.getcwd(), tmp_dir, blob.name) + path = os.path.join(tmp_dir, blob.name.replace('/', '_')) blob.download_to_filename(path) - return os.path.join(os.getcwd(), tmp_dir, '*') + return os.path.join(tmp_dir, '*') else: file_name = uri.split('/')[3] blob = bucket.get_blob(file_name) - path = os.path.join(os.getcwd(), tmp_dir, file_name) + path = os.path.join(tmp_dir, blob.name.replace('/', '_')) blob.download_to_filename(path) return path diff --git a/tools/differ/test/results/point-analysis-results.csv b/tools/differ/test/results/point-analysis-results.csv new file mode 100644 index 0000000000..80feb425a5 --- /dev/null +++ b/tools/differ/test/results/point-analysis-results.csv @@ -0,0 +1,7 @@ +variableMeasured,_diff_result,observationAbout,observationDate,size +dcid:Max_Concentration_AirPollutant_Ozone,added,['dcid:cpcpAq/Secretariat_Amaravati___APPCB'],"['""2024-09-24T12:00:00""']",1 +dcid:Mean_Concentration_AirPollutant_CO,deleted,"['dcid:cpcpAq/Secretariat_Amaravati___APPCB', 'dcid:cpcpAq/Secretariat_Amaravati___APPCB']","['""2024-09-24T12:00:00""', '""2024-09-25T12:00:00""']",2 +dcid:Min_Concentration_AirPollutant_Ozone,deleted,"['dcid:cpcpAq/Secretariat_Amaravati___APPCB', 'dcid:cpcpAq/Secretariat_Amaravati___IMD']","['""2024-09-24T12:00:00""', '""2024-09-24T12:00:00""']",2 +dcid:Mean_Concentration_AirPollutant_CO,modified,['dcid:cpcpAq/Secretariat_Amaravati___IMD'],"['""2024-09-24T12:00:00""']",1 +dcid:Mean_Concentration_AirPollutant_Ozone,modified,['dcid:cpcpAq/Secretariat_Amaravati___APPCB'],"['""2024-09-24T12:00:00""']",1 +dcid:Mean_Concentration_AirPollutant_CO,same,['dcid:cpcpAq/Secretariat_Amaravati___IMD'],"['""2024-09-25T12:00:00""']",1 diff --git a/tools/differ/test/results/point-analysis-summary.csv b/tools/differ/test/results/point-analysis-summary.csv new file mode 100644 index 0000000000..4d344b5639 --- /dev/null +++ b/tools/differ/test/results/point-analysis-summary.csv @@ -0,0 +1,5 @@ +variableMeasured,added,deleted,modified,same,total +dcid:Max_Concentration_AirPollutant_Ozone,1,0,0,0,1 +dcid:Mean_Concentration_AirPollutant_CO,0,2,1,1,4 +dcid:Mean_Concentration_AirPollutant_Ozone,0,0,1,0,1 +dcid:Min_Concentration_AirPollutant_Ozone,0,2,0,0,2 diff --git a/tools/differ/test/results/series-analysis-results.csv b/tools/differ/test/results/series-analysis-results.csv new file mode 100644 index 0000000000..b776dbd2f5 --- /dev/null +++ b/tools/differ/test/results/series-analysis-results.csv @@ -0,0 +1,6 @@ +variableMeasured,_diff_result,observationAbout,size +dcid:Max_Concentration_AirPollutant_Ozone,added,['dcid:cpcpAq/Secretariat_Amaravati___APPCB'],1 +dcid:Mean_Concentration_AirPollutant_CO,deleted,['dcid:cpcpAq/Secretariat_Amaravati___APPCB'],1 +dcid:Min_Concentration_AirPollutant_Ozone,deleted,"['dcid:cpcpAq/Secretariat_Amaravati___IMD', 'dcid:cpcpAq/Secretariat_Amaravati___APPCB']",2 +dcid:Mean_Concentration_AirPollutant_CO,modified,['dcid:cpcpAq/Secretariat_Amaravati___IMD'],1 +dcid:Mean_Concentration_AirPollutant_Ozone,modified,['dcid:cpcpAq/Secretariat_Amaravati___APPCB'],1 diff --git a/tools/differ/test/results/series-analysis-summary.csv b/tools/differ/test/results/series-analysis-summary.csv new file mode 100644 index 0000000000..4f3b954643 --- /dev/null +++ b/tools/differ/test/results/series-analysis-summary.csv @@ -0,0 +1,5 @@ +variableMeasured,added,deleted,modified,same,total +dcid:Max_Concentration_AirPollutant_Ozone,1,0,0,0,1 +dcid:Mean_Concentration_AirPollutant_CO,0,1,1,0,2 +dcid:Mean_Concentration_AirPollutant_Ozone,0,0,1,0,1 +dcid:Min_Concentration_AirPollutant_Ozone,0,2,0,0,2