Skip to content

Commit

Permalink
Addressed reviewer comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vish-cs committed Jan 2, 2025
1 parent 890ef43 commit 3eb4188
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 76 deletions.
29 changes: 16 additions & 13 deletions tools/differ/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,26 @@ This utility generates a diff (point and series analysis) of two versions of the
python differ.py --current_data=<path> --previous_data=<path>
```

Parameter description
current_data: Path to the current MCF data (single mcf file or folder/* supported).
previous_data: Path to the previous MCF data (single mcf file or folder/* supported).
output_location: Path to the output data folder.
groupby_columns: Columns to group data for diff analysis in the order var,place,time etc. Default value: “variableMeasured,observationAbout,observationDate”
value_columns: Columns with statvar value (unit etc.) for diff analysis. Default value: "value,unit"
Parameter description:
- current\_data: Path to the current MCF data (single mcf file or folder/* on local/GCS supported).
- previous\_data: Path to the previous MCF data (single mcf file or folder/* on local/GCS supported).
- output\_location: Path to the output data folder. Default value: results.
- groupby\_columns: Columns to group data for diff analysis in the order var,place,time etc. Default value: “variableMeasured,observationAbout,observationDate,measureMethod,unit”.
- value\_columns: Columns with statvar value for diff analysis. Default value: "unit,scalingFactor".

**Output**

Summary output generated is of the form below showing counts of differences for each variable.

variableMeasured added deleted modified same total
0 dcid:var1 1 0 0 0 1
1 dcid:var2 0 2 1 1 4
2 dcid:var3 0 0 1 0 1
3 dcid:var4 0 2 0 0 2
| |variableMeasured|added|deleted|modified|same|total|
|---|---|---|---|---|---|---|
|0|dcid:var1|1|0|0|0|1|
|1|dcid:var2|0|2|1|1|4|
|2|dcid:var3|0|0|1|0|1|
|3|dcid:var4|0|2|0|0|2|

Detailed diff output is written to files for further analysis.
Detailed diff output is written to files for further analysis. Sample result files can be found under folder 'test/results'.
- point-analysis-summary.csv: diff summry for point analysis
- point-analysis-results.csv: detailed results for point analysis
- series-analysis-summary.csv: diff summry for series analysis
- series-analysis-results.csv: detailed results for series analysis
- series-analysis-results.csv: detailed results for series analysis
95 changes: 52 additions & 43 deletions tools/differ/differ.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 Google LLC
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -26,20 +26,20 @@
FLAGS = flags.FLAGS
flags.DEFINE_string(
'current_data', '', 'Path to the current MCF data \
(single mcf file or folder/* supported).')
(single mcf file or folder/* on local/GCS supported).')
flags.DEFINE_string(
'previous_data', '', 'Path to the previous MCF data \
(single mcf file or folder/* supported).')
(single mcf file or folder/* on local/GCS supported).')
flags.DEFINE_string('output_location', 'results', \
'Path to the output data folder.')

flags.DEFINE_string(
'groupby_columns', 'variableMeasured,observationAbout,observationDate',
'groupby_columns',
'variableMeasured,observationAbout,observationDate,measurementMethod,unit',
'Columns to group data for diff analysis in the order (var,place,time etc.).'
)
flags.DEFINE_string(
'value_columns', 'value,unit',
'Columns with statvar value (unit etc.) for diff analysis.')
flags.DEFINE_string('value_columns', 'value,scalingFactor',
'Columns with statvar value for diff analysis.')

SAMPLE_COUNT = 3

Expand Down Expand Up @@ -69,13 +69,17 @@ class DatasetDiffer:
"""

def __init__(self, groupby_columns, value_columns):
def __init__(self, current_data, previous_data, output_location,
groupby_columns, value_columns):
self.current_data = current_data
self.previous_data = previous_data
self.output_location = output_location
self.groupby_columns = groupby_columns.split(',')
self.value_columns = value_columns.split(',')
self.variable_column = self.groupby_columns[0]
self.place_column = self.groupby_columns[1]
self.time_column = self.groupby_columns[2]
self.diff_column = '_diff_result'
self.diff_column = 'diff_result'

def _cleanup_data(self, df: pd.DataFrame):
for column in ['added', 'deleted', 'modified', 'same']:
Expand Down Expand Up @@ -208,43 +212,48 @@ def series_analysis(self,
axis=1)
return summary, result

def run_differ(self):
if not os.path.exists(FLAGS.output_location):
os.makedirs(FLAGS.output_location)
logging.info('Loading data...')
current_df = helper.load_data(self.current_data, self.output_location)
previous_df = helper.load_data(self.previous_data, self.output_location)

logging.info('Processing data...')
in_data = self.process_data(previous_df, current_df)

logging.info('Point analysis:')
summary, result = self.point_analysis(in_data)
result.sort_values(by=[self.diff_column, self.variable_column],
inplace=True)
print(summary.head(10))
print(result.head(10))
helper.write_data(summary, self.output_location,
'point-analysis-summary.csv')
helper.write_data(result, self.output_location,
'point-analysis-results.csv')

logging.info('Series analysis:')
summary, result = self.series_analysis(in_data)
result.sort_values(by=[self.diff_column, self.variable_column],
inplace=True)
print(summary.head(10))
print(result.head(10))
helper.write_data(summary, self.output_location,
'series-analysis-summary.csv')
helper.write_data(result, self.output_location,
'series-analysis-results.csv')

logging.info('Differ output written to folder: %s',
self.output_location)


def main(_):
'''Runs the differ.'''
differ = DatasetDiffer(FLAGS.groupby_columns, FLAGS.value_columns)

if not os.path.exists(FLAGS.output_location):
os.makedirs(FLAGS.output_location)
logging.info('Loading data...')
current_df = helper.load_data(FLAGS.current_data, FLAGS.output_location)
previous_df = helper.load_data(FLAGS.previous_data, FLAGS.output_location)

logging.info('Processing data...')
in_data = differ.process_data(previous_df, current_df)

logging.info('Point analysis:')
summary, result = differ.point_analysis(in_data)
result.sort_values(by=[differ.diff_column, differ.variable_column],
inplace=True)
print(summary.head(10))
print(result.head(10))
helper.write_data(summary, FLAGS.output_location,
'point-analysis-summary.csv')
helper.write_data(result, FLAGS.output_location,
'point-analysis-results.csv')

logging.info('Series analysis:')
summary, result = differ.series_analysis(in_data)
result.sort_values(by=[differ.diff_column, differ.variable_column],
inplace=True)
print(summary.head(10))
print(result.head(10))
helper.write_data(summary, FLAGS.output_location,
'series-analysis-summary.csv')
helper.write_data(result, FLAGS.output_location,
'series-analysis-results.csv')

logging.info('Differ output written to folder: %s', FLAGS.output_location)
differ = DatasetDiffer(FLAGS.current_data, FLAGS.previous_data,
FLAGS.output_location, FLAGS.groupby_columns,
FLAGS.value_columns)
differ.run_differ()


if __name__ == '__main__':
Expand Down
16 changes: 9 additions & 7 deletions tools/differ/differ_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2023 Google LLC
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,12 +33,14 @@ class TestDiffer(unittest.TestCase):
def test_diff_analysis(self):
groupby_columns = 'variableMeasured,observationAbout,observationDate'
value_columns = 'value'

differ = DatasetDiffer(groupby_columns, value_columns)
current = helper.load_mcf_file(
os.path.join(module_dir, 'test', 'current.mcf'))
previous = helper.load_mcf_file(
os.path.join(module_dir, 'test', 'previous.mcf'))
current_data = os.path.join(module_dir, 'test', 'current.mcf')
previous_data = os.path.join(module_dir, 'test', 'previous.mcf')
output_location = os.path.join(module_dir, 'test')

differ = DatasetDiffer(current_data, previous_data, output_location,
groupby_columns, value_columns)
current = helper.load_mcf_file(current_data)
previous = helper.load_mcf_file(previous_data)

in_data = differ.process_data(previous, current)
summary, result = differ.point_analysis(in_data)
Expand Down
32 changes: 19 additions & 13 deletions tools/differ/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pandas as pd
import re

from absl import logging
from google.cloud.storage import Client


Expand All @@ -14,35 +15,38 @@ def load_mcf_file(file: str) -> pd.DataFrame:
# nodes separated by a blank line
mcf_nodes_text = mcf_contents.split('\n\n')
# lines seprated as property: constraint
mcf_line = re.compile(r'^(\w+): (.*)$')
mcf_line = re.compile(r'^(\w+)\s*:\s*(.*)$')
mcf_nodes = []
for node in mcf_nodes_text:
current_mcf_node = {}
for line in node.split('\n'):
parsed_line = mcf_line.match(line)
if parsed_line is not None:
current_mcf_node[parsed_line.group(1)] = parsed_line.group(2)
if current_mcf_node and current_mcf_node[
'typeOf'] == 'dcid:StatVarObservation':
mcf_nodes.append(current_mcf_node)
if current_mcf_node:
if current_mcf_node['typeOf'] == 'dcid:StatVarObservation':
mcf_nodes.append(current_mcf_node)
else:
logging.warning(
f'Ignoring node of type:{current_mcf_node["typeOf"]}')
df = pd.DataFrame(mcf_nodes)
return df


def load_mcf_files(path: str) -> pd.DataFrame:
""" Loads all sharded mcf files in the given directory and
returns a single combined dataframe."""
df = pd.DataFrame()
df_list = []
filenames = glob.glob(path + '.mcf')
for filename in filenames:
df2 = load_mcf_file(filename)
# Merge data frames, expects same headers
df = pd.concat([df, df2])
return df
df = load_mcf_file(filename)
df_list.append(df)
result = pd.concat(df_list, ignore_index=True)
return result


def write_data(df: pd.DataFrame, path: str, file: str):
""" Write a dataframe to a CSV file with the given path."""
""" Writes a dataframe to a CSV file with the given path."""
out_file = open(os.path.join(path, file), mode='w', encoding='utf-8')
df.to_csv(out_file, index=False, mode='w')
out_file.close()
Expand All @@ -52,6 +56,7 @@ def load_data(path: str, tmp_dir: str) -> pd.DataFrame:
""" Loads data from the given path and returns as a dataframe.
Args:
path: local or gcs path (single file or folder/* format)
tmp_dir: destination folder
Returns:
dataframe with the input data
"""
Expand All @@ -68,6 +73,7 @@ def get_gcs_data(uri: str, tmp_dir: str) -> str:
""" Downloads files form GCS and copies them to local.
Args:
uri: single file path or folder/* format
tmp_dir: destination folder
Returns:
path to the output file/folder
"""
Expand All @@ -77,12 +83,12 @@ def get_gcs_data(uri: str, tmp_dir: str) -> str:
if uri.endswith('*'):
blobs = client.list_blobs(bucket)
for blob in blobs:
path = os.path.join(os.getcwd(), tmp_dir, blob.name)
path = os.path.join(tmp_dir, blob.name.replace('/', '_'))
blob.download_to_filename(path)
return os.path.join(os.getcwd(), tmp_dir, '*')
return os.path.join(tmp_dir, '*')
else:
file_name = uri.split('/')[3]
blob = bucket.get_blob(file_name)
path = os.path.join(os.getcwd(), tmp_dir, file_name)
path = os.path.join(tmp_dir, blob.name.replace('/', '_'))
blob.download_to_filename(path)
return path
7 changes: 7 additions & 0 deletions tools/differ/test/results/point-analysis-results.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
variableMeasured,_diff_result,observationAbout,observationDate,size
dcid:Max_Concentration_AirPollutant_Ozone,added,['dcid:cpcpAq/Secretariat_Amaravati___APPCB'],"['""2024-09-24T12:00:00""']",1
dcid:Mean_Concentration_AirPollutant_CO,deleted,"['dcid:cpcpAq/Secretariat_Amaravati___APPCB', 'dcid:cpcpAq/Secretariat_Amaravati___APPCB']","['""2024-09-24T12:00:00""', '""2024-09-25T12:00:00""']",2
dcid:Min_Concentration_AirPollutant_Ozone,deleted,"['dcid:cpcpAq/Secretariat_Amaravati___APPCB', 'dcid:cpcpAq/Secretariat_Amaravati___IMD']","['""2024-09-24T12:00:00""', '""2024-09-24T12:00:00""']",2
dcid:Mean_Concentration_AirPollutant_CO,modified,['dcid:cpcpAq/Secretariat_Amaravati___IMD'],"['""2024-09-24T12:00:00""']",1
dcid:Mean_Concentration_AirPollutant_Ozone,modified,['dcid:cpcpAq/Secretariat_Amaravati___APPCB'],"['""2024-09-24T12:00:00""']",1
dcid:Mean_Concentration_AirPollutant_CO,same,['dcid:cpcpAq/Secretariat_Amaravati___IMD'],"['""2024-09-25T12:00:00""']",1
5 changes: 5 additions & 0 deletions tools/differ/test/results/point-analysis-summary.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
variableMeasured,added,deleted,modified,same,total
dcid:Max_Concentration_AirPollutant_Ozone,1,0,0,0,1
dcid:Mean_Concentration_AirPollutant_CO,0,2,1,1,4
dcid:Mean_Concentration_AirPollutant_Ozone,0,0,1,0,1
dcid:Min_Concentration_AirPollutant_Ozone,0,2,0,0,2
6 changes: 6 additions & 0 deletions tools/differ/test/results/series-analysis-results.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
variableMeasured,_diff_result,observationAbout,size
dcid:Max_Concentration_AirPollutant_Ozone,added,['dcid:cpcpAq/Secretariat_Amaravati___APPCB'],1
dcid:Mean_Concentration_AirPollutant_CO,deleted,['dcid:cpcpAq/Secretariat_Amaravati___APPCB'],1
dcid:Min_Concentration_AirPollutant_Ozone,deleted,"['dcid:cpcpAq/Secretariat_Amaravati___IMD', 'dcid:cpcpAq/Secretariat_Amaravati___APPCB']",2
dcid:Mean_Concentration_AirPollutant_CO,modified,['dcid:cpcpAq/Secretariat_Amaravati___IMD'],1
dcid:Mean_Concentration_AirPollutant_Ozone,modified,['dcid:cpcpAq/Secretariat_Amaravati___APPCB'],1
5 changes: 5 additions & 0 deletions tools/differ/test/results/series-analysis-summary.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
variableMeasured,added,deleted,modified,same,total
dcid:Max_Concentration_AirPollutant_Ozone,1,0,0,0,1
dcid:Mean_Concentration_AirPollutant_CO,0,1,1,0,2
dcid:Mean_Concentration_AirPollutant_Ozone,0,0,1,0,1
dcid:Min_Concentration_AirPollutant_Ozone,0,2,0,0,2

0 comments on commit 3eb4188

Please sign in to comment.