From 4f620134b18e8ccb75b616194c755caa65c43306 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Wed, 6 Dec 2023 00:40:16 +0530 Subject: [PATCH 1/3] set defaults to single year, limited counties to temporarily have script finish in <1 for autorefresh test --- scripts/us_usda/quickstats/process.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/scripts/us_usda/quickstats/process.py b/scripts/us_usda/quickstats/process.py index 90768e668c..3a8e190724 100644 --- a/scripts/us_usda/quickstats/process.py +++ b/scripts/us_usda/quickstats/process.py @@ -64,14 +64,18 @@ flags.DEFINE_string(_USDA_API_KEY, None, 'USDA quickstats API key.') flags.DEFINE_integer( 'start_year', - os.getenv('start_year', 2000), - 'Year from whihc data is processed.', + os.getenv('start_year', datetime.now().year), + 'Year from which data is processed.', ) flags.DEFINE_integer( 'num_counties', - os.getenv('num_counties', 5000), + os.getenv('num_counties', 100), 'number of counties for which data is processed.', ) +flags.DEFINE_string( + 'output_dir', + 'output', + 'Output firectory for generated files.') def process_survey_data(year, svs, out_dir): @@ -295,7 +299,7 @@ def get_multiple_years(): start = datetime.now() print('Start', start) - out_dir = 'output' + out_dir = _FLAGS.output_dir svs = load_svs() years = range(_FLAGS.start_year, datetime.now().year + 1) for year in years: From e8ba9282e75354bc1a2054738972a024c64be674 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Wed, 6 Dec 2023 12:23:14 +0530 Subject: [PATCH 2/3] lint fix --- scripts/us_usda/quickstats/process.py | 343 +++++++++++++------------- 1 file changed, 174 insertions(+), 169 deletions(-) diff --git a/scripts/us_usda/quickstats/process.py b/scripts/us_usda/quickstats/process.py index 3a8e190724..f8e28d30af 100644 --- a/scripts/us_usda/quickstats/process.py +++ b/scripts/us_usda/quickstats/process.py @@ -73,145 +73,143 @@ 'number of counties for which data is processed.', ) flags.DEFINE_string( - 'output_dir', - 'output', - 'Output firectory for generated files.') + 'output_dir', 'output', 'Output firectory for generated files.' +) def process_survey_data(year, svs, out_dir): - start = datetime.now() - print('Start', year, '=', start) + start = datetime.now() + print('Start', year, '=', start) - os.makedirs(get_parts_dir(out_dir, year), exist_ok=True) - os.makedirs(get_response_dir(out_dir, year), exist_ok=True) + os.makedirs(get_parts_dir(out_dir, year), exist_ok=True) + os.makedirs(get_response_dir(out_dir, year), exist_ok=True) - print('Processing survey data for year', year) + print('Processing survey data for year', year) - print('Getting county names') - county_names = get_param_values('county_name') - county_names = county_names[:_FLAGS.num_counties] - print('# counties =', len(county_names)) + print('Getting county names') + county_names = get_param_values('county_name') + county_names = county_names[: _FLAGS.num_counties] + print('# counties =', len(county_names)) - pool_size = max(2, multiprocessing.cpu_count() - 1) + pool_size = max(2, multiprocessing.cpu_count() - 1) - with multiprocessing.Pool(pool_size) as pool: - pool.starmap( - fetch_and_write, - zip(county_names, repeat(year), repeat(svs), repeat(out_dir)), - ) + with multiprocessing.Pool(pool_size) as pool: + pool.starmap( + fetch_and_write, + zip(county_names, repeat(year), repeat(svs), repeat(out_dir)), + ) - write_aggregate_csv(year, out_dir) + write_aggregate_csv(year, out_dir) - end = datetime.now() - print('End', year, '=', end) - print('Duration', year, '=', str(end - start)) + end = datetime.now() + print('End', year, '=', end) + print('Duration', year, '=', str(end - start)) def get_parts_dir(out_dir, year): - return f'{out_dir}/parts/{year}' + return f'{out_dir}/parts/{year}' def get_response_dir(out_dir, year): - return f'{out_dir}/response/{year}' + return f'{out_dir}/response/{year}' def get_response_file_path(out_dir, year, county): - return f'{get_response_dir(out_dir, year)}/{county}.json' + return f'{get_response_dir(out_dir, year)}/{county}.json' def get_year_csv_file_path(out_dir, year): - return f'{out_dir}/ag-{year}.csv' + return f'{out_dir}/ag-{year}.csv' def write_aggregate_csv(year, out_dir): - parts_dir = get_parts_dir(out_dir, year) - part_files = os.listdir(parts_dir) - out_file = get_year_csv_file_path(out_dir, year) + parts_dir = get_parts_dir(out_dir, year) + part_files = os.listdir(parts_dir) + out_file = get_year_csv_file_path(out_dir, year) - print('Writing aggregate CSV', out_file) + print('Writing aggregate CSV', out_file) - with open(out_file, 'w', newline='') as out: - csv_writer = csv.DictWriter(out, - fieldnames=CSV_COLUMNS, - lineterminator='\n') - csv_writer.writeheader() - for part_file in part_files: - if part_file.endswith('.csv'): - with open(f'{parts_dir}/{part_file}', 'r') as part: - csv_writer.writerows(csv.DictReader(part)) + with open(out_file, 'w', newline='') as out: + csv_writer = csv.DictWriter( + out, fieldnames=CSV_COLUMNS, lineterminator='\n' + ) + csv_writer.writeheader() + for part_file in part_files: + if part_file.endswith('.csv'): + with open(f'{parts_dir}/{part_file}', 'r') as part: + csv_writer.writerows(csv.DictReader(part)) def write_consolidated_csv(years, out_dir): - out_file = f'{out_dir}/consolidated.csv' + out_file = f'{out_dir}/consolidated.csv' - print('Writing consolidated CSV', out_file) + print('Writing consolidated CSV', out_file) - with open(out_file, 'w', newline='') as out: - csv_writer = csv.DictWriter(out, - fieldnames=CSV_COLUMNS, - lineterminator='\n') - csv_writer.writeheader() - for year in years: - with open(get_year_csv_file_path(out_dir, year), 'r') as part: - csv_writer.writerows(csv.DictReader(part)) + with open(out_file, 'w', newline='') as out: + csv_writer = csv.DictWriter( + out, fieldnames=CSV_COLUMNS, lineterminator='\n' + ) + csv_writer.writeheader() + for year in years: + with open(get_year_csv_file_path(out_dir, year), 'r') as part: + csv_writer.writerows(csv.DictReader(part)) def fetch_and_write(county_name, year, svs, out_dir): - out_file = ( - f"{get_parts_dir(out_dir, year)}/{county_name.replace('[^a-zA-Z0-9]', '')}.csv" - ) - api_data = get_survey_county_data(year, county_name, out_dir) - county_csv_rows = to_csv_rows(api_data, svs) - print( - 'Writing', - len(county_csv_rows), - 'rows for county', - county_name, - 'to file', - out_file, - ) - with open(out_file, 'w', newline='') as out: - write_csv(out, county_csv_rows) + out_file = ( + f"{get_parts_dir(out_dir, year)}/{county_name.replace('[^a-zA-Z0-9]', '')}.csv" + ) + api_data = get_survey_county_data(year, county_name, out_dir) + county_csv_rows = to_csv_rows(api_data, svs) + print( + 'Writing', + len(county_csv_rows), + 'rows for county', + county_name, + 'to file', + out_file, + ) + with open(out_file, 'w', newline='') as out: + write_csv(out, county_csv_rows) def get_survey_county_data(year, county, out_dir): - print('Getting', year, 'survey data for county', county) - - response_file = get_response_file_path(out_dir, year, county) - if os.path.exists(response_file): - print('Reading response from file', response_file) - with open(response_file, 'r') as f: - response = json.load(f) - else: - params = { - 'key': get_usda_api_key(), - 'source_desc': 'SURVEY', - 'year': year, - 'county_name': county, - } - response = get_data(params) - with open(response_file, 'w') as f: - print('Writing response to file', response_file) - json.dump(response, f, indent=2) - - if 'data' not in response: - eprint('No api records found for county', county) - return {'data': []} - - print('# api records for', county, '=', len(response['data'])) - return response + print('Getting', year, 'survey data for county', county) + + response_file = get_response_file_path(out_dir, year, county) + if os.path.exists(response_file): + print('Reading response from file', response_file) + with open(response_file, 'r') as f: + response = json.load(f) + else: + params = { + 'key': get_usda_api_key(), + 'source_desc': 'SURVEY', + 'year': year, + 'county_name': county, + } + response = get_data(params) + with open(response_file, 'w') as f: + print('Writing response to file', response_file) + json.dump(response, f, indent=2) + + if 'data' not in response: + eprint('No api records found for county', county) + return {'data': []} + + print('# api records for', county, '=', len(response['data'])) + return response @limits(calls=10, period=60) def get_data(params): - return requests.get(f'{API_BASE}/api_GET', params=params).json() + return requests.get(f'{API_BASE}/api_GET', params=params).json() def get_param_values(param): - params = {'key': get_usda_api_key(), 'param': param} - response = requests.get(f'{API_BASE}/get_param_values', - params=params).json() - return [] if param not in response else response[param] + params = {'key': get_usda_api_key(), 'param': param} + response = requests.get(f'{API_BASE}/get_param_values', params=params).json() + return [] if param not in response else response[param] """Converts a quickstats data row to a DC CSV row. @@ -224,116 +222,123 @@ def get_param_values(param): def to_csv_row(data_row, svs): - name = data_row['short_desc'] - if (data_row['domaincat_desc'] and - data_row['domaincat_desc'] != 'NOT SPECIFIED'): - name = f"{name}%%{data_row['domaincat_desc']}" - - if name not in svs: - eprint('SKIPPED, No SV mapped for', name) - return None - - county_code = data_row['county_code'] - if county_code in SKIPPED_COUNTY_CODES: - eprint('SKIPPED, Unsupported county code', county_code) - return None - - value = ((data_row['value'] if 'value' in data_row else - data_row['Value']).strip().replace(',', '')) - if value in SKIPPED_VALUES: - eprint('SKIPPED, Invalid value', f"'{value}'", 'for', name) - return None - value = int(value) - - observation_about = ( - f"dcid:geoId/{data_row['state_fips_code']}{county_code}" - if data_row['state_fips_code'] else 'dcid:country/USA') - - sv = svs[name] - - return { - 'variableMeasured': sv['sv'], - 'observationDate': data_row['year'], - 'observationAbout': observation_about, - 'value': value, - 'unit': sv['unit'], - } + name = data_row['short_desc'] + if ( + data_row['domaincat_desc'] + and data_row['domaincat_desc'] != 'NOT SPECIFIED' + ): + name = f"{name}%%{data_row['domaincat_desc']}" + + if name not in svs: + eprint('SKIPPED, No SV mapped for', name) + return None + + county_code = data_row['county_code'] + if county_code in SKIPPED_COUNTY_CODES: + eprint('SKIPPED, Unsupported county code', county_code) + return None + + value = ( + (data_row['value'] if 'value' in data_row else data_row['Value']) + .strip() + .replace(',', '') + ) + if value in SKIPPED_VALUES: + eprint('SKIPPED, Invalid value', f"'{value}'", 'for', name) + return None + value = int(value) + + observation_about = ( + f"dcid:geoId/{data_row['state_fips_code']}{county_code}" + if data_row['state_fips_code'] + else 'dcid:country/USA' + ) + + sv = svs[name] + + return { + 'variableMeasured': sv['sv'], + 'observationDate': data_row['year'], + 'observationAbout': observation_about, + 'value': value, + 'unit': sv['unit'], + } def to_csv_rows(api_data, svs): - csv_rows = [] + csv_rows = [] - for data_row in api_data['data']: - csv_row = to_csv_row(data_row, svs) - if csv_row: - csv_rows.append(csv_row) + for data_row in api_data['data']: + csv_row = to_csv_row(data_row, svs) + if csv_row: + csv_rows.append(csv_row) - return csv_rows + return csv_rows def load_svs(): - svs = {} - with open('sv.csv', newline='') as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - svs[row['name']] = row - return svs + svs = {} + with open('sv.csv', newline='') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + svs[row['name']] = row + return svs def write_csv(out, rows): - writer = csv.DictWriter(out, fieldnames=CSV_COLUMNS, lineterminator='\n') - writer.writeheader() - writer.writerows(rows) + writer = csv.DictWriter(out, fieldnames=CSV_COLUMNS, lineterminator='\n') + writer.writeheader() + writer.writerows(rows) def eprint(*args, **kwargs): - print(*args, file=sys.stderr, **kwargs) + print(*args, file=sys.stderr, **kwargs) def get_all_counties(): - svs = load_svs() - process_survey_data(2023, svs, 'output') + svs = load_svs() + process_survey_data(2023, svs, 'output') def get_multiple_years(): - start = datetime.now() - print('Start', start) + start = datetime.now() + print('Start', start) - out_dir = _FLAGS.output_dir - svs = load_svs() - years = range(_FLAGS.start_year, datetime.now().year + 1) - for year in years: - process_survey_data(year, svs, out_dir) + out_dir = _FLAGS.output_dir + svs = load_svs() + years = range(_FLAGS.start_year, datetime.now().year + 1) + for year in years: + process_survey_data(year, svs, out_dir) - write_consolidated_csv(years, out_dir) + write_consolidated_csv(years, out_dir) - end = datetime.now() - print('End', end) - print('Duration', str(end - start)) + end = datetime.now() + print('End', end) + print('Duration', str(end - start)) def get_cloud_config(): - print('Getting cloud config.') - storage_client = storage.Client(_GCS_PROJECT_ID) - bucket = storage_client.bucket(_GCS_BUCKET) - blob = bucket.blob(_GCS_FILE_PATH) - return json.loads(blob.download_as_string(client=None)) + print('Getting cloud config.') + storage_client = storage.Client(_GCS_PROJECT_ID) + bucket = storage_client.bucket(_GCS_BUCKET) + blob = bucket.blob(_GCS_FILE_PATH) + return json.loads(blob.download_as_string(client=None)) def load_usda_api_key(): - if get_usda_api_key() is None: - _FLAGS.set_default(_USDA_API_KEY, get_cloud_config()[_USDA_API_KEY]) + if get_usda_api_key() is None: + _FLAGS.set_default(_USDA_API_KEY, get_cloud_config()[_USDA_API_KEY]) def get_usda_api_key(): - return _FLAGS.usda_api_key + return _FLAGS.usda_api_key def main(_): - load_usda_api_key() - print('USDA API key', get_usda_api_key()) - get_multiple_years() + load_usda_api_key() + print('USDA API key', get_usda_api_key()) + get_multiple_years() if __name__ == '__main__': - app.run(main) + app.run(main) From fc7fb03ae98097770fd97ae7e9e7d1d42349e681 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Wed, 6 Dec 2023 12:41:35 +0530 Subject: [PATCH 3/3] lint fix --- scripts/us_usda/quickstats/process.py | 346 +++++++++++++------------- 1 file changed, 170 insertions(+), 176 deletions(-) diff --git a/scripts/us_usda/quickstats/process.py b/scripts/us_usda/quickstats/process.py index f8e28d30af..8520d0677c 100644 --- a/scripts/us_usda/quickstats/process.py +++ b/scripts/us_usda/quickstats/process.py @@ -64,7 +64,8 @@ flags.DEFINE_string(_USDA_API_KEY, None, 'USDA quickstats API key.') flags.DEFINE_integer( 'start_year', - os.getenv('start_year', datetime.now().year), + os.getenv('start_year', + datetime.now().year), 'Year from which data is processed.', ) flags.DEFINE_integer( @@ -72,144 +73,144 @@ os.getenv('num_counties', 100), 'number of counties for which data is processed.', ) -flags.DEFINE_string( - 'output_dir', 'output', 'Output firectory for generated files.' -) +flags.DEFINE_string('output_dir', 'output', + 'Output firectory for generated files.') def process_survey_data(year, svs, out_dir): - start = datetime.now() - print('Start', year, '=', start) + start = datetime.now() + print('Start', year, '=', start) - os.makedirs(get_parts_dir(out_dir, year), exist_ok=True) - os.makedirs(get_response_dir(out_dir, year), exist_ok=True) + os.makedirs(get_parts_dir(out_dir, year), exist_ok=True) + os.makedirs(get_response_dir(out_dir, year), exist_ok=True) - print('Processing survey data for year', year) + print('Processing survey data for year', year) - print('Getting county names') - county_names = get_param_values('county_name') - county_names = county_names[: _FLAGS.num_counties] - print('# counties =', len(county_names)) + print('Getting county names') + county_names = get_param_values('county_name') + county_names = county_names[:_FLAGS.num_counties] + print('# counties =', len(county_names)) - pool_size = max(2, multiprocessing.cpu_count() - 1) + pool_size = max(2, multiprocessing.cpu_count() - 1) - with multiprocessing.Pool(pool_size) as pool: - pool.starmap( - fetch_and_write, - zip(county_names, repeat(year), repeat(svs), repeat(out_dir)), - ) + with multiprocessing.Pool(pool_size) as pool: + pool.starmap( + fetch_and_write, + zip(county_names, repeat(year), repeat(svs), repeat(out_dir)), + ) - write_aggregate_csv(year, out_dir) + write_aggregate_csv(year, out_dir) - end = datetime.now() - print('End', year, '=', end) - print('Duration', year, '=', str(end - start)) + end = datetime.now() + print('End', year, '=', end) + print('Duration', year, '=', str(end - start)) def get_parts_dir(out_dir, year): - return f'{out_dir}/parts/{year}' + return f'{out_dir}/parts/{year}' def get_response_dir(out_dir, year): - return f'{out_dir}/response/{year}' + return f'{out_dir}/response/{year}' def get_response_file_path(out_dir, year, county): - return f'{get_response_dir(out_dir, year)}/{county}.json' + return f'{get_response_dir(out_dir, year)}/{county}.json' def get_year_csv_file_path(out_dir, year): - return f'{out_dir}/ag-{year}.csv' + return f'{out_dir}/ag-{year}.csv' def write_aggregate_csv(year, out_dir): - parts_dir = get_parts_dir(out_dir, year) - part_files = os.listdir(parts_dir) - out_file = get_year_csv_file_path(out_dir, year) + parts_dir = get_parts_dir(out_dir, year) + part_files = os.listdir(parts_dir) + out_file = get_year_csv_file_path(out_dir, year) - print('Writing aggregate CSV', out_file) + print('Writing aggregate CSV', out_file) - with open(out_file, 'w', newline='') as out: - csv_writer = csv.DictWriter( - out, fieldnames=CSV_COLUMNS, lineterminator='\n' - ) - csv_writer.writeheader() - for part_file in part_files: - if part_file.endswith('.csv'): - with open(f'{parts_dir}/{part_file}', 'r') as part: - csv_writer.writerows(csv.DictReader(part)) + with open(out_file, 'w', newline='') as out: + csv_writer = csv.DictWriter(out, + fieldnames=CSV_COLUMNS, + lineterminator='\n') + csv_writer.writeheader() + for part_file in part_files: + if part_file.endswith('.csv'): + with open(f'{parts_dir}/{part_file}', 'r') as part: + csv_writer.writerows(csv.DictReader(part)) def write_consolidated_csv(years, out_dir): - out_file = f'{out_dir}/consolidated.csv' + out_file = f'{out_dir}/consolidated.csv' - print('Writing consolidated CSV', out_file) + print('Writing consolidated CSV', out_file) - with open(out_file, 'w', newline='') as out: - csv_writer = csv.DictWriter( - out, fieldnames=CSV_COLUMNS, lineterminator='\n' - ) - csv_writer.writeheader() - for year in years: - with open(get_year_csv_file_path(out_dir, year), 'r') as part: - csv_writer.writerows(csv.DictReader(part)) + with open(out_file, 'w', newline='') as out: + csv_writer = csv.DictWriter(out, + fieldnames=CSV_COLUMNS, + lineterminator='\n') + csv_writer.writeheader() + for year in years: + with open(get_year_csv_file_path(out_dir, year), 'r') as part: + csv_writer.writerows(csv.DictReader(part)) def fetch_and_write(county_name, year, svs, out_dir): - out_file = ( - f"{get_parts_dir(out_dir, year)}/{county_name.replace('[^a-zA-Z0-9]', '')}.csv" - ) - api_data = get_survey_county_data(year, county_name, out_dir) - county_csv_rows = to_csv_rows(api_data, svs) - print( - 'Writing', - len(county_csv_rows), - 'rows for county', - county_name, - 'to file', - out_file, - ) - with open(out_file, 'w', newline='') as out: - write_csv(out, county_csv_rows) + out_file = ( + f"{get_parts_dir(out_dir, year)}/{county_name.replace('[^a-zA-Z0-9]', '')}.csv" + ) + api_data = get_survey_county_data(year, county_name, out_dir) + county_csv_rows = to_csv_rows(api_data, svs) + print( + 'Writing', + len(county_csv_rows), + 'rows for county', + county_name, + 'to file', + out_file, + ) + with open(out_file, 'w', newline='') as out: + write_csv(out, county_csv_rows) def get_survey_county_data(year, county, out_dir): - print('Getting', year, 'survey data for county', county) - - response_file = get_response_file_path(out_dir, year, county) - if os.path.exists(response_file): - print('Reading response from file', response_file) - with open(response_file, 'r') as f: - response = json.load(f) - else: - params = { - 'key': get_usda_api_key(), - 'source_desc': 'SURVEY', - 'year': year, - 'county_name': county, - } - response = get_data(params) - with open(response_file, 'w') as f: - print('Writing response to file', response_file) - json.dump(response, f, indent=2) - - if 'data' not in response: - eprint('No api records found for county', county) - return {'data': []} - - print('# api records for', county, '=', len(response['data'])) - return response + print('Getting', year, 'survey data for county', county) + + response_file = get_response_file_path(out_dir, year, county) + if os.path.exists(response_file): + print('Reading response from file', response_file) + with open(response_file, 'r') as f: + response = json.load(f) + else: + params = { + 'key': get_usda_api_key(), + 'source_desc': 'SURVEY', + 'year': year, + 'county_name': county, + } + response = get_data(params) + with open(response_file, 'w') as f: + print('Writing response to file', response_file) + json.dump(response, f, indent=2) + + if 'data' not in response: + eprint('No api records found for county', county) + return {'data': []} + + print('# api records for', county, '=', len(response['data'])) + return response @limits(calls=10, period=60) def get_data(params): - return requests.get(f'{API_BASE}/api_GET', params=params).json() + return requests.get(f'{API_BASE}/api_GET', params=params).json() def get_param_values(param): - params = {'key': get_usda_api_key(), 'param': param} - response = requests.get(f'{API_BASE}/get_param_values', params=params).json() - return [] if param not in response else response[param] + params = {'key': get_usda_api_key(), 'param': param} + response = requests.get(f'{API_BASE}/get_param_values', + params=params).json() + return [] if param not in response else response[param] """Converts a quickstats data row to a DC CSV row. @@ -222,123 +223,116 @@ def get_param_values(param): def to_csv_row(data_row, svs): - name = data_row['short_desc'] - if ( - data_row['domaincat_desc'] - and data_row['domaincat_desc'] != 'NOT SPECIFIED' - ): - name = f"{name}%%{data_row['domaincat_desc']}" - - if name not in svs: - eprint('SKIPPED, No SV mapped for', name) - return None - - county_code = data_row['county_code'] - if county_code in SKIPPED_COUNTY_CODES: - eprint('SKIPPED, Unsupported county code', county_code) - return None - - value = ( - (data_row['value'] if 'value' in data_row else data_row['Value']) - .strip() - .replace(',', '') - ) - if value in SKIPPED_VALUES: - eprint('SKIPPED, Invalid value', f"'{value}'", 'for', name) - return None - value = int(value) - - observation_about = ( - f"dcid:geoId/{data_row['state_fips_code']}{county_code}" - if data_row['state_fips_code'] - else 'dcid:country/USA' - ) - - sv = svs[name] - - return { - 'variableMeasured': sv['sv'], - 'observationDate': data_row['year'], - 'observationAbout': observation_about, - 'value': value, - 'unit': sv['unit'], - } + name = data_row['short_desc'] + if (data_row['domaincat_desc'] and + data_row['domaincat_desc'] != 'NOT SPECIFIED'): + name = f"{name}%%{data_row['domaincat_desc']}" + + if name not in svs: + eprint('SKIPPED, No SV mapped for', name) + return None + + county_code = data_row['county_code'] + if county_code in SKIPPED_COUNTY_CODES: + eprint('SKIPPED, Unsupported county code', county_code) + return None + + value = ((data_row['value'] if 'value' in data_row else + data_row['Value']).strip().replace(',', '')) + if value in SKIPPED_VALUES: + eprint('SKIPPED, Invalid value', f"'{value}'", 'for', name) + return None + value = int(value) + + observation_about = ( + f"dcid:geoId/{data_row['state_fips_code']}{county_code}" + if data_row['state_fips_code'] else 'dcid:country/USA') + + sv = svs[name] + + return { + 'variableMeasured': sv['sv'], + 'observationDate': data_row['year'], + 'observationAbout': observation_about, + 'value': value, + 'unit': sv['unit'], + } def to_csv_rows(api_data, svs): - csv_rows = [] + csv_rows = [] - for data_row in api_data['data']: - csv_row = to_csv_row(data_row, svs) - if csv_row: - csv_rows.append(csv_row) + for data_row in api_data['data']: + csv_row = to_csv_row(data_row, svs) + if csv_row: + csv_rows.append(csv_row) - return csv_rows + return csv_rows def load_svs(): - svs = {} - with open('sv.csv', newline='') as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - svs[row['name']] = row - return svs + svs = {} + with open('sv.csv', newline='') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + svs[row['name']] = row + return svs def write_csv(out, rows): - writer = csv.DictWriter(out, fieldnames=CSV_COLUMNS, lineterminator='\n') - writer.writeheader() - writer.writerows(rows) + writer = csv.DictWriter(out, fieldnames=CSV_COLUMNS, lineterminator='\n') + writer.writeheader() + writer.writerows(rows) def eprint(*args, **kwargs): - print(*args, file=sys.stderr, **kwargs) + print(*args, file=sys.stderr, **kwargs) def get_all_counties(): - svs = load_svs() - process_survey_data(2023, svs, 'output') + svs = load_svs() + process_survey_data(2023, svs, 'output') def get_multiple_years(): - start = datetime.now() - print('Start', start) + start = datetime.now() + print('Start', start) - out_dir = _FLAGS.output_dir - svs = load_svs() - years = range(_FLAGS.start_year, datetime.now().year + 1) - for year in years: - process_survey_data(year, svs, out_dir) + out_dir = _FLAGS.output_dir + svs = load_svs() + years = range(_FLAGS.start_year, datetime.now().year + 1) + for year in years: + process_survey_data(year, svs, out_dir) - write_consolidated_csv(years, out_dir) + write_consolidated_csv(years, out_dir) - end = datetime.now() - print('End', end) - print('Duration', str(end - start)) + end = datetime.now() + print('End', end) + print('Duration', str(end - start)) def get_cloud_config(): - print('Getting cloud config.') - storage_client = storage.Client(_GCS_PROJECT_ID) - bucket = storage_client.bucket(_GCS_BUCKET) - blob = bucket.blob(_GCS_FILE_PATH) - return json.loads(blob.download_as_string(client=None)) + print('Getting cloud config.') + storage_client = storage.Client(_GCS_PROJECT_ID) + bucket = storage_client.bucket(_GCS_BUCKET) + blob = bucket.blob(_GCS_FILE_PATH) + return json.loads(blob.download_as_string(client=None)) def load_usda_api_key(): - if get_usda_api_key() is None: - _FLAGS.set_default(_USDA_API_KEY, get_cloud_config()[_USDA_API_KEY]) + if get_usda_api_key() is None: + _FLAGS.set_default(_USDA_API_KEY, get_cloud_config()[_USDA_API_KEY]) def get_usda_api_key(): - return _FLAGS.usda_api_key + return _FLAGS.usda_api_key def main(_): - load_usda_api_key() - print('USDA API key', get_usda_api_key()) - get_multiple_years() + load_usda_api_key() + print('USDA API key', get_usda_api_key()) + get_multiple_years() if __name__ == '__main__': - app.run(main) + app.run(main)