Skip to content

Commit

Permalink
Merge pull request #13 from RTCovid/improve_processing_speed
Browse files Browse the repository at this point in the history
Improve processing speed
  • Loading branch information
mradamcox authored May 14, 2020
2 parents a6d501c + e2691cb commit bf1197f
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 135 deletions.
38 changes: 24 additions & 14 deletions agol_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@

class AGOLConnection(object):

def __init__(self):
def __init__(self, verbose=False):

creds = self._load_credentials()
if creds is None:
raise Exception("no arcgis credentials supplied")

self.creds = creds
self.layers = self._get_layers()
self.gis = self._get_gis()
self.gis = self._make_connection()
self.verbose = verbose

def _load_credentials(self):

Expand All @@ -42,7 +43,7 @@ def _get_layers(self):

return configs

def _get_gis(self):
def _make_connection(self):
username = self.creds['username']
password = self.creds['password']
host = self.creds['host']
Expand All @@ -64,21 +65,25 @@ def get_arcgis_feature_collection_from_item_id(self, arcgis_item_id):

def overwrite_arcgis_layer(self, dataset_name, source_data_dir, source_data_file, dry_run=False):

print(f"Begin upload to ArcGIS Online")
if dry_run is True:
print("** DRY RUN -- NO UPLOAD WILL HAPPEN **")
if self.verbose:
print(f"Begin upload to ArcGIS Online")
if dry_run is True:
print("** DRY RUN -- NO UPLOAD WILL HAPPEN **")

try:
layer_config = self.layers[dataset_name]
except KeyError:
print(f"Invalid dataset name: {dataset_name}. Valid options are {list(self.layers.keys())}. Alter agol_layers.json to add more.")
if self.verbose:
print(f"Invalid dataset name: {dataset_name}. Valid options are"
" {list(self.layers.keys())}. Alter agol_layers.json to add more.")
return False

original_file_name = layer_config['original_file_name']
item_id = layer_config['id']

print(f" ArcGIS Online item id: {layer_config['id']}")
print(f" CSV name used for upload: {layer_config['original_file_name']}")
if self.verbose:
print(f" ArcGIS Online item id: {layer_config['id']}")
print(f" CSV name used for upload: {layer_config['original_file_name']}")

fs = self.get_arcgis_feature_collection_from_item_id(item_id)
# Overwrite docs:
Expand All @@ -96,19 +101,24 @@ def overwrite_arcgis_layer(self, dataset_name, source_data_dir, source_data_file
shutil.copyfile(os.path.join(source_data_dir, source_data_file),
os.path.join(tmpdirname, original_file_name))

print(f" local CSV file name: {source_data_dir}/{source_data_file}")
if self.verbose:
print(f" local CSV file name: {source_data_dir}/{source_data_file}")
original_dir = os.getcwd()
os.chdir(tmpdirname)
if dry_run is False:
try:
print(" starting upload...")
if self.verbose:
print(" starting upload...")
result = fs.manager.overwrite(original_file_name)
except Exception as e:
print(f"Caught exception {e} during upload, retrying")
if self.verbose:
print(f"Caught exception {e} during upload, retrying")
result = fs.manager.overwrite(original_file_name)
print(" finished.")
if self.verbose:
print(" finished.")
else:
result = "Dry run complete"
if self.verbose:
result = "Dry run complete"
os.chdir(original_dir)
return result

Expand Down
13 changes: 13 additions & 0 deletions header_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ def get_fieldnames_and_aliases(self):

return self.get_fieldnames() + self.get_aliases()

def get_master_lookup(self):
"""This lookup has keys for all long names AND all short names. Each key
corresponds to the proper short name. Allows a single point of entry for
any header name."""

lookup = {}
for k, v in self.mapping.items():
lookup[k] = k
for alias in v:
lookup[alias] = k

return lookup


ltc_mapping = {}

Expand Down
140 changes: 72 additions & 68 deletions ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from operators import get_datetime_from_filename
from arcgis.features import FeatureLayerCollection, FeatureSet, Table, Feature
from validator import ValidationError
from agol_connection import AGOLConnection


def load_csv_to_df(csv_file_path):
Expand Down Expand Up @@ -62,16 +63,18 @@ def create_summary_table_row(df, source_data_timestamp, source_filename):

class Ingester(object):

def __init__(self, dry_run=False):
def __init__(self, dry_run=False, verbose=False):

creds = self._load_credentials()
if creds is None:
raise Exception("no sftp credentials supplied")

self.creds = creds
self.dry_run = dry_run
self.gis = None
agol_connection = AGOLConnection()
self.agol = agol_connection
self.available_files = []
self.verbose = verbose

def _load_credentials(self):

Expand All @@ -89,12 +92,8 @@ def _load_credentials(self):
creds['host'] = row['host']
return creds

def set_gis(self, gis_object):

self.gis = gis_object

def get_files_from_sftp(self, prefix="HOS_ResourceCapacity_", target_dir="/tmp",
only_latest=True, filenames_to_ignore=[]):
only_latest=True, filenames_to_ignore=[], verbose=False):

cnopts = pysftp.CnOpts()
cnopts.hostkeys.load('copaftp.pub')
Expand All @@ -121,19 +120,27 @@ def get_files_from_sftp(self, prefix="HOS_ResourceCapacity_", target_dir="/tmp",
files_to_get = files
for f in files_to_get:
if f in filenames_to_ignore:
print(f"Ignoring {f}")
if self.verbose:
print(f"Ignoring {f}")
continue
print(f"Getting: {f}")
if self.verbose:
print(f"Getting: {f}")
if os.path.join(target_dir, f) not in existing_files:
sftp.get(f, f'{target_dir}/{f}')
print(f"Finished downloading {target_dir}/{f}")
if self.verbose:
print(f"Finished downloading {target_dir}/{f}")
else:
print(f"Didn't have to download {target_dir}/{f}; it already exists")
if self.verbose:
print(f"Didn't have to download {target_dir}/{f}; it already exists")

source_date = get_datetime_from_filename(f, prefix=prefix)
file_details.append({"dir": target_dir, "filename": f, "source_datetime": source_date})
return (file_details, files)

def get_already_processed_files(self, dataset_name):

return self.agol.get_already_processed_files(dataset_name)

def process_hospital(self, processed_dir, processed_filename, public=True):

# public vs. non-public means different ArcGIS online items
Expand All @@ -142,32 +149,38 @@ def process_hospital(self, processed_dir, processed_filename, public=True):
else:
dataset_name = "hospital_layer"

print(f"Starting load of hospital data: {dataset_name}")
if self.verbose:
print(f"Starting load of hospital data: {dataset_name}")

status = self.gis.overwrite_arcgis_layer(dataset_name, processed_dir, processed_filename, dry_run=self.dry_run)
status = self.agol.overwrite_arcgis_layer(dataset_name, processed_dir, processed_filename, dry_run=self.dry_run)

print(status)
print(f"Finished load of hospital data: {dataset_name}")
if self.verbose:
print(status)
print(f"Finished load of hospital data: {dataset_name}")
return processed_dir, processed_filename

def process_supplies(self, processed_dir, processed_filename):
print("Starting load of supplies data")
if self.verbose:
print("Starting load of supplies data")

# set the new file name using the original file name in the layers conf
supplies_filename = self.gis.layers['supplies']['original_file_name']
supplies_filename = self.agol.layers['supplies']['original_file_name']

df = load_csv_to_df(os.path.join(processed_dir, processed_filename))
supplies = create_supplies_table(df)

supplies.to_csv(os.path.join(processed_dir, supplies_filename), index=False)

status = self.gis.overwrite_arcgis_layer("supplies", processed_dir, supplies_filename, dry_run=self.dry_run)
print(status)
print("Finished load of supplies data")
status = self.agol.overwrite_arcgis_layer("supplies", processed_dir, supplies_filename, dry_run=self.dry_run)

if self.verbose:
print(status)
print("Finished load of supplies data")

def process_county_summaries(self, processed_dir, processed_filename):

print("Starting load of county summary table...")
if self.verbose:
print("Starting load of county summary table...")

new_data_filename = "new_county_summary_table.csv"

Expand All @@ -190,14 +203,18 @@ def process_county_summaries(self, processed_dir, processed_filename):

d2.to_csv(os.path.join(processed_dir, new_data_filename), header=True, index=False)

status = self.gis.overwrite_arcgis_layer("county_summaries", processed_dir, new_data_filename, dry_run=self.dry_run)
print(status)
print("Finished load of county summary data")
status = self.agol.overwrite_arcgis_layer("county_summaries", processed_dir, new_data_filename, dry_run=self.dry_run)

if self.verbose:
print(status)
print("Finished load of county summary data")

def process_summaries(self, processed_dir, processed_file_details, make_historical_csv=False):
print("Starting load of summary table...")

summary_filename = self.gis.layers['summary_table']['original_file_name']
if self.verbose:
print("Starting load of summary table...")

summary_filename = self.agol.layers['summary_table']['original_file_name']

summary_df = pd.DataFrame()
for f in processed_file_details:
Expand All @@ -213,14 +230,15 @@ def process_summaries(self, processed_dir, processed_file_details, make_historic
if make_historical_csv:
out_csv_file = os.path.join(processed_dir, summary_filename)
summary_df.to_csv(out_csv_file, index=False, header=True)
print("Finished creation of historical summary table CSV, returning.")
if self.verbose:
print("Finished creation of historical summary table CSV, returning.")
return

layer_conf = self.gis.layers['summary_table']
layer_conf = self.agol.layers['summary_table']

# this self.gis.gis.content pattern is evidence that the first pass at
# a refactored structure should not be the last...
table = self.gis.gis.content.get(layer_conf['id'])
table = self.agol.gis.content.get(layer_conf['id'])
t = table.tables[0]

new_col_names = {}
Expand All @@ -237,64 +255,47 @@ def process_summaries(self, processed_dir, processed_file_details, make_historic
# but it won't stop the processing.
fs = FeatureSet(features)
if self.dry_run:
print("Dry run set, not editing features.")
status = "Dry run"
if self.verbose:
print("Dry run set, not editing features.")
else:
status = t.edit_features(adds=features)
print(status)
print("Finished load of summary table")
if self.verbose:
print(status)
if self.verbose:
print("Finished load of summary table")


def process_historical_hos(self, processed_dir, processed_file_details, make_historical_csv=False):

print("Starting load of historical HOS table...")
if self.verbose:
print("Starting load of historical HOS table...")

layer_conf = self.gis.layers['full_historical_table']
layer_conf = self.agol.layers['full_historical_table']
original_data_file_name = layer_conf['original_file_name']

table = self.gis.gis.content.get(layer_conf['id'])
table = self.agol.gis.content.get(layer_conf['id'])
t = table.layers[0]

# get short field names that are in use online to test the input csv headers
agol_fields = {n["alias"]: n["name"] for n in t.properties.fields}
# not used now but retained in case of future needs
# agol_fields = {n["alias"]: n["name"] for n in t.properties.fields}

# iterate all csvs and collect the information from each one.
# normalize header names at the same time
hist_csv_rows = []
mapping = hm.HeaderMapping("HOS")
alias_lookup = mapping.get_alias_lookup() # used to convert historical names (pre-5/12)
valid_fieldnames = mapping.get_fieldnames() # valid short names (post-5/12)
for f in processed_file_details:
fname = f["processed_filename"]
size = os.path.getsize(os.path.join(processed_dir, fname))
if size > 0:
processed_time = datetime.utcnow().isoformat()
processed_time = datetime.utcnow().isoformat()
with open(os.path.join(processed_dir, fname), newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:

out_row = {}

for col_name, value in row.items():
# first test if col_name in new valid headers
# if so, use the col_name and value directly
if col_name in valid_fieldnames:
out_row[col_name] = value
# next test if col_name in lookup of old aliases
# if so, convert the col_name
elif col_name in alias_lookup:
out_row[alias_lookup[col_name]] = value
print(f"Found a long name in {fname}: {col_name}")
# finally, raise exception if the field can't be matched
else:
msg = f"{fname}: Can't match field '{col_name}'"
raise ValidationError(msg)

out_row["Source Data Timestamp"] = f["source_datetime"].isoformat()
out_row["Processed At"] = processed_time
out_row["Source Filename"] = f["filename"]

hist_csv_rows.append(out_row)
row["Source Data Timestamp"] = f["source_datetime"].isoformat()
row["Processed At"] = processed_time
row["Source Filename"] = f["filename"]
hist_csv_rows.append(row)

else:
print(f"{fname} has a filesize of {size}, not processing.")
Expand All @@ -312,28 +313,31 @@ def process_historical_hos(self, processed_dir, processed_file_details, make_hi
features = [Feature(attributes=row) for row in hist_csv_rows]
fs = FeatureSet(features)
if self.dry_run:
print("Dry run set, not editing features.")
if self.verbose:
print("Dry run set, not editing features.")
else:
fc = len(features)
chunksize = 1000.0
feature_batchs = chunks(features, math.ceil(fc / chunksize))
fb_list = list(feature_batchs)
fbc = len(fb_list)
print(f"Adding {fc} features to the historical table in {fbc} batches.")
if self.verbose:
print(f"Adding {fc} features to the historical table in {fbc} batches.")
for batch in fb_list:
status = t.edit_features(adds=batch)
print(status)
print("Finished load of historical HOS table")
if self.verbose:
print("Finished load of historical HOS table")

def process_daily_hospital_averages(self, historical_gis_item_id, daily_averages_item_id):
# see what days have been processed
# if not processed,
# if not processed,
# get the historical table
# turn it into a df
# per day, get the averages
# for new: days
print("XXX daily_hospital_averages stub, returning.")
table = self.gis.content.get(historical_gis_item_id)
table = self.agol.gis.content.get(historical_gis_item_id)
t = table.layers[0]


Expand Down
Loading

0 comments on commit bf1197f

Please sign in to comment.