Skip to content

Commit

Permalink
#1028 Deduplicate get_layer function and combine three insert_data fu…
Browse files Browse the repository at this point in the history
…nctions into one
  • Loading branch information
leo-oxu committed Oct 3, 2024
1 parent 37b8b8f commit c89aab5
Showing 1 changed file with 47 additions and 197 deletions.
244 changes: 47 additions & 197 deletions gis/gccview/gcc_puller_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ def create_audited_table(output_table, return_json, schema_name, primary_key, co
LOGGER.info(create_sql.as_string(con))
cur.execute(create_sql)

owner_sql = sql.SQL("ALTER TABLE IF EXISTS {schema_table} OWNER to gis_admins").format(schema_table = sql.Identifier(schema_name, temp_table_name))
cur.execute(owner_sql)
# owner_sql = sql.SQL("ALTER TABLE IF EXISTS {schema_table} OWNER to gis_admins").format(schema_table = sql.Identifier(schema_name, temp_table_name))
# cur.execute(owner_sql)

# Add a pk
with con:
Expand Down Expand Up @@ -390,88 +390,13 @@ def find_limit(return_json):
keep_adding = False
return keep_adding

def insert_audited_data(output_table, insert_column, return_json, schema_name, con):
"""
Function to insert data to our postgresql database, the data is inserted into a temp table (for audited tables)
Parameters
----------
output_table : string
Table name for postgresql, returned from function get_tablename
insert_column : SQL composed
Composed object of column name and types use for creating a new postgresql table
return_json : json
Resulted json response from calling the api, returned from function get_data
schema_name : string
The schema in which the table will be inserted into
con: Airflow Connection
Could be the connection to bigdata or to on-prem server
"""
def insert_data(output_table, insert_column, return_json, schema_name, con, is_audited, is_partitioned):
rows = []
features = return_json['features']
fields = return_json['fields']
trials = [[field['name'], field['type']] for field in fields]

for feature in features:
geom = feature['geometry']
geometry_type = return_json['geometryType']
geometry = get_geometry(geometry_type, geom)

row = []
for trial in trials:
if trial[1] == 'esriFieldTypeDate' and feature['attributes'][trial[0]] != None:
row.append(to_time(feature['attributes'][trial[0]]))
else:
row.append(feature['attributes'][trial[0]])

row.append(geometry)

rows.append(row)

# Since this is a temporary table, name it '_table' as opposed to 'table' for now (for audited tables)
temp_table_name = '_' + output_table

insert=sql.SQL("INSERT INTO {schema_table} ({columns}) VALUES %s").format(
schema_table = sql.Identifier(schema_name, temp_table_name),
columns = insert_column
)
with con:
with con.cursor() as cur:
execute_values(cur, insert, rows)
LOGGER.info('Successfully inserted %d records into %s', len(rows), output_table)

def insert_partitioned_data(schema_parent_table_insert, insert_column, return_json, schema_name, con):
"""
Function to insert data to our postgresql database (for partitioned tables)
Parameters
----------
schema_parent_table_insert : string
Table name for postgresql, returned from function create_partitioned_table
insert_column : SQL composed
Composed object of column name and types use for creating a new postgresql table
return_json : json
Resulted json response from calling the api, returned from function get_data
schema_name : string
The schema in which the table will be inserted into
con: Airflow Connection
Could be the connection to bigdata or to on-prem server
"""

today_string = datetime.date.today().strftime('%Y-%m-%d')

rows = []
features = return_json['features']
fields = return_json['fields']
trials = [[field['name'], field['type']] for field in fields]
for feature in features:
geom = feature['geometry']
geometry_type = return_json['geometryType']
Expand All @@ -483,69 +408,21 @@ def insert_partitioned_data(schema_parent_table_insert, insert_column, return_js
row.append(to_time(feature['attributes'][trial[0]]))
else:
row.append(feature['attributes'][trial[0]])

row.insert(0, today_string)

if is_partitioned:
row.insert(0, today_string)
row.append(geometry)

rows.append(row)


insert=sql.SQL("INSERT INTO {schema_table} ({columns}) VALUES %s").format(
schema_table = sql.Identifier(schema_name, schema_parent_table_insert),
columns = insert_column
)
with con:
with con.cursor() as cur:
execute_values(cur, insert, rows)
LOGGER.info('Successfully inserted %d records into %s', len(rows), schema_parent_table_insert)

def insert_data(output_table, insert_column, return_json, schema_name, con):
"""
Function to insert data to our postgresql database, the data is inserted into a temp table (for audited tables)
Parameters
----------
output_table : string
Table name for postgresql, returned from function get_tablename
if is_audited:
output_table = '_' + output_table

insert_column : SQL composed
Composed object of column name and types use for creating a new postgresql table
return_json : json
Resulted json response from calling the api, returned from function get_data
schema_name : string
The schema in which the table will be inserted into
con: Airflow Connection
Could be the connection to bigdata or to on-prem server
"""
rows = []
features = return_json['features']
fields = return_json['fields']
trials = [[field['name'], field['type']] for field in fields]

for feature in features:
geom = feature['geometry']
geometry_type = return_json['geometryType']
geometry = get_geometry(geometry_type, geom)

row = []
for trial in trials:
if trial[1] == 'esriFieldTypeDate' and feature['attributes'][trial[0]] != None:
row.append(to_time(feature['attributes'][trial[0]]))
else:
row.append(feature['attributes'][trial[0]])

row.append(geometry)

rows.append(row)


insert=sql.SQL("INSERT INTO {schema_table} ({columns}) VALUES %s").format(
schema_table = sql.Identifier(schema_name, output_table),
columns = insert_column
)

with con:
with con.cursor() as cur:
execute_values(cur, insert, rows)
Expand Down Expand Up @@ -649,7 +526,7 @@ def update_table(output_table, insert_column, excluded_column, primary_key, sche
return successful_execution
#-------------------------------------------------------------------------------------------------------
# base main function, also compatible with Airflow
def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None, primary_key = None, is_manual = False):
def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None, primary_key = None, is_partitioned = False):
"""
This function calls to the GCCview rest API and inserts the outputs to the output table in the postgres database.
Expand Down Expand Up @@ -694,71 +571,42 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con =
keep_adding = True
counter = 0
#--------------------------------
if is_manual:
while keep_adding == True:

if counter == 0:
return_json = get_data(mapserver, layer_id)
insert_column = create_table(output_table, return_json, schema_name, con)

features = return_json['features']
record_max=len(features)
max_number = record_max
else:
return_json = get_data(mapserver, layer_id, max_number = max_number, record_max = record_max)

# Insert data into the table
insert_data(output_table, insert_column, return_json, schema_name, con)

# Update loop variables
counter += 1
keep_adding = find_limit(return_json)

if keep_adding:
max_number += record_max
else:
LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table)
else:
if is_audited and primary_key is None:
if is_audited and primary_key is None:
LOGGER.error("Audited tables should have a primary key.")
if not(is_audited) and primary_key is not None:
LOGGER.error("Non-audited tables do not use the primary key.")
#--------------------------------
while keep_adding == True:
if counter == 0:
# Without specifying the max_number and record_max, the number of data we get from this query
# should be the max record count of the layer if the layer is huge
return_json = get_data(mapserver, layer_id)
if is_audited:
(insert_column, excluded_column) = create_audited_table(output_table, return_json, schema_name, primary_key, con)
else:
(insert_column, schema_parent_table_insert) = create_partitioned_table(output_table, return_json, schema_name, con)

features = return_json['features']
record_max=(len(features))
max_number = record_max
else:
return_json = get_data(mapserver, layer_id, max_number = max_number, record_max = record_max)

# Insert data into the table
if not(is_audited) and primary_key is not None:
LOGGER.error("Non-audited tables do not use the primary key.")
#--------------------------------
while keep_adding == True:
if counter == 0:
return_json = get_data(mapserver, layer_id)
if is_audited:
insert_audited_data(output_table, insert_column, return_json, schema_name, con)
(insert_column, excluded_column) = create_audited_table(output_table, return_json, schema_name, primary_key, con)
elif is_partitioned:
(insert_column, output_table) = create_partitioned_table(output_table, return_json, schema_name, con)
else:
insert_partitioned_data(schema_parent_table_insert, insert_column, return_json, schema_name, con)
# Update loop variables
counter += 1
keep_adding = find_limit(return_json)
insert_column = create_table(output_table, return_json, schema_name, con)
features = return_json['features']
record_max=(len(features))
max_number = record_max
else:
return_json = get_data(mapserver, layer_id, max_number = max_number, record_max = record_max)

if keep_adding:
max_number += record_max
else:
LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table)
insert_data(output_table, insert_column, return_json, schema_name, con, is_audited, is_partitioned)

counter += 1
keep_adding = find_limit(return_json)

if keep_adding:
max_number += record_max
else:
LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table)

if is_audited:
try:
update_table(output_table, insert_column, excluded_column, primary_key, schema_name, con)
except Exception as err:
LOGGER.exception("Unable to update table %s", err)

if is_audited:
try:
update_table(output_table, insert_column, excluded_column, primary_key, schema_name, con)
except Exception as err:
LOGGER.exception("Unable to update table %s", err)

@click.command()
@click.option('--mapserver', '-ms', type = int, required = True,
Expand All @@ -768,12 +616,14 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con =
@click.option('--schema-name', '-s', type = str, required = True
, help = 'Name of destination schema')
@click.option('--is-audited', '-a', is_flag=True, show_default=True, default=False,
help = 'Whether the table is supposed to be audited (T) or partitioned (F)')
help = 'Whether the table is supposed to be audited (T) or non-audited(F)')
@click.option('--primary-key', '-pk', type = str, default=None, required = False,
help = 'Primary key. Only include if table is audited.')
@click.option('--con', '-c', type = str, required = True,
help = 'The path to the credential config file')
def manual_get_layer(mapserver, layer_id, schema_name, is_audited, primary_key, con):
@click.option('--is-partitioned', '-p', is_flag=True, show_default=True, default=False,
help = 'Whether the table is supposed to be partitioned (T) or not partitioned (F)')
def manual_get_layer(mapserver, layer_id, schema_name, is_audited, primary_key, con, is_partitioned=True):
"""
This script pulls a GIS layer from GCC servers into the databases of
the Data and Analytics Unit.
Expand All @@ -794,8 +644,8 @@ def manual_get_layer(mapserver, layer_id, schema_name, is_audited, primary_key,
is_audited = is_audited,
primary_key = primary_key,
con=connection_obj,
is_manual = True
is_partitioned = is_partitioned
)

if __name__ == '__main__':
manual_get_layer()
manual_get_layer()

0 comments on commit c89aab5

Please sign in to comment.