Skip to content

Commit

Permalink
add a bunch of timing print statements; some cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mradamcox committed May 14, 2020
1 parent 52473a7 commit e2691cb
Showing 1 changed file with 50 additions and 17 deletions.
67 changes: 50 additions & 17 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
import os
import argparse
from datetime import datetime

import header_mapping as hm
from operators import process_csv
Expand All @@ -9,9 +10,11 @@
from ingester import Ingester


print("Processing instantaneous tables...")
def process_instantaneous(dry_run=False, datadir=None, verbose=False):

start = datetime.now()
print("\nSTARTING process_instantaneous()")

if datadir is None:
datadir = "/tmp"

Expand All @@ -21,21 +24,25 @@ def process_instantaneous(dry_run=False, datadir=None, verbose=False):
# should be done to consolidate the sftp and file management process.
# the only difference is that now Ingester().get_files_from_sftp() is used.
print("Getting latest HOS file from SFTP...")
a = datetime.now()
file_details, all_filenames = ingester.get_files_from_sftp(target_dir=datadir)
print("Finished.")
print(f"Finished: {datetime.now() - a}")

latest_file_details = file_details[0]

# run a validation on the latest file (the one that will be used for instantaneous)
# validate_csv() will raise an exception if it fails. use raise_exception=False
# to run it quietly.
a = datetime.now()
v = CSVValidator("HOS")
fpath = os.path.join(latest_file_details['dir'], latest_file_details['filename'])
v.validate_csv(fpath)
print(f"latest CSV validated: {datetime.now() - a}")

# validation passed on downloaded file, now do all processing

# Public-only data
a = datetime.now()
public_processed_file_details = process_csv(
[latest_file_details],
output_prefix="public_processed_HOS_",
Expand All @@ -44,63 +51,94 @@ def process_instantaneous(dry_run=False, datadir=None, verbose=False):
)
public_processed_filename = public_processed_file_details[0]["processed_filename"]
public_processed_dir = public_processed_file_details[0]["output_dir"]
print(f"processed latest CSV (only public columns): {datetime.now() - a}")

# Full data
a = datetime.now()
processed_file_details = process_csv(
[latest_file_details],
output_dir=datadir
)
print(f"processed latest CSV (full): {datetime.now() - a}")

processed_filename = processed_file_details[0]["processed_filename"]
processed_dir = processed_file_details[0]["output_dir"]

print(f"Finished processing {datadir}/{latest_file_details['filename']}, file is {processed_dir}/{processed_filename}")

# process csv to update the non-public hospital table
a = datetime.now()
ingester.process_hospital(processed_dir, processed_filename, public=False)
print(f"process hospital (full): {datetime.now() - a}")

# process csv to update the public hospital table
a = datetime.now()
ingester.process_hospital(public_processed_dir, public_processed_filename)
print(f"process hospital (public): {datetime.now() - a}")

# process supplies
a = datetime.now()
ingester.process_supplies(processed_dir, processed_filename)
print(f"process supplies: {datetime.now() - a}")

a = datetime.now()
ingester.process_county_summaries(processed_dir, processed_filename)
print("Finished processing instantaneous tables.")
print(f"process county summaries: {datetime.now() - a}")
print(f"FINISHED process_instantaneous(): {datetime.now() - start}")

def process_historical(dry_run=False, datadir=None, make_historical_csv=False, verbose=False):

print("\nSTARTING process_historical()")
start = datetime.now()
if datadir is None:
datadir = "/tmp"

ingester = Ingester(dry_run, verbose=verbose)

files_to_not_sftp = ingester.gis.get_already_processed_files("summary_table")
a = datetime.now()
files_to_not_sftp = ingester.get_already_processed_files("summary_table")
print(f"determined already processed files (summary_table): {datetime.now() - a}")

a = datetime.now()
file_details, all_filenames = ingester.get_files_from_sftp(target_dir=datadir, only_latest=False, filenames_to_ignore=files_to_not_sftp)
print(f"downloaded files: {datetime.now() - a}")

if len(file_details) == 0:
print("No new files to process for historical summary table data.")
print(" No new files to process for historical summary table data.")
else:
a = datetime.now()
processed_file_details = process_csv(file_details, output_dir=datadir)
print(f"process_csv(): {datetime.now() - a}")
processed_dir = processed_file_details[0]["output_dir"]
a = datetime.now()
ingester.process_summaries(processed_dir, processed_file_details, make_historical_csv=make_historical_csv)
print(f"process_summaries(): {datetime.now() - a}")

a = datetime.now()
files_to_not_sftp = ingester.get_already_processed_files("full_historical_table")
print(f"determined already processed files (full_historical_table): {datetime.now() - a}")

files_to_not_sftp = ingester.gis.get_already_processed_files("full_historical_table")

if make_historical_csv:
# setting files_to_not_sftp to an empty list ensures we rebuild the full historical table
files_to_not_sftp = []

a = datetime.now()
file_details, all_filenames = ingester.get_files_from_sftp(target_dir=datadir, only_latest=False, filenames_to_ignore=files_to_not_sftp)
print(f"downloaded files: {datetime.now() - a}")

if len(file_details) == 0:
print("No new files to process for historical data.")
print(" No new files to process for historical data.")
else:
a = datetime.now()
processed_file_details = process_csv(file_details, output_dir=datadir)
print(f"process_csv(): {datetime.now() - a}")
processed_dir = processed_file_details[0]["output_dir"]
a = datetime.now()
ingester.process_historical_hos(processed_dir, processed_file_details, make_historical_csv=make_historical_csv)
print(f"process_historical_hos: {datetime.now() - a}")


print("Finished processing historical tables.")
print(f"FINISHED process_historical(): {datetime.now()-start}")


def process_canary_features(dry_run=False, datadir=None, verbose=False):
Expand All @@ -109,8 +147,6 @@ def process_canary_features(dry_run=False, datadir=None, verbose=False):
datadir = "/tmp"

ingester = Ingester(dry_run, verbose=verbose)
agol_connection = AGOLConnection()
ingester.set_gis(agol_connection)

print("XXX not doing historical averages yet")
historical_gis_item_id = "3b39827f6f804c33b9b1114b5aa1d6b6" # v4
Expand Down Expand Up @@ -142,12 +178,9 @@ def historical_pubsub(event, context):
parser.add_argument("--make_historical_csv", action="store_true")
parser.add_argument("--quiet", action="store_true")
args = parser.parse_args()
if args.dry_run is not None:
dry_run = True
if args.make_historical_csv is not None:
make_historical_csv = True
print(f"dry_run: {dry_run}")
main(dry_run, datadir=args.dir, make_historical_csv=make_historical_csv)

print(f"dry_run: {args.dry_run}")

# note that the cli argument is --quiet but from here on the argument passed around is "verbose"
verbose = not args.quiet
main(args.dry_run, datadir=args.dir, make_historical_csv=args.make_historical_csv, verbose=verbose)

0 comments on commit e2691cb

Please sign in to comment.