From bb684bfc8810cc5d13cbc54bb08a7ec547a99b46 Mon Sep 17 00:00:00 2001 From: chmnata Date: Thu, 1 Aug 2024 20:49:58 +0000 Subject: [PATCH 01/14] #1028 add generic puller function --- gis/gccview/gcc_layer_puller.py | 277 ++++++++++++++++++++++++++++++++ 1 file changed, 277 insertions(+) create mode 100644 gis/gccview/gcc_layer_puller.py diff --git a/gis/gccview/gcc_layer_puller.py b/gis/gccview/gcc_layer_puller.py new file mode 100644 index 000000000..d878ea169 --- /dev/null +++ b/gis/gccview/gcc_layer_puller.py @@ -0,0 +1,277 @@ +import configparser +import requests +import datetime +from psycopg2 import connect +from psycopg2 import sql +from psycopg2.extras import execute_values +import logging +from time import sleep +import click +from pathlib import Path +import configparser +from psycopg2 import connect +CONFIG = configparser.ConfigParser() +CONFIG.read(str(Path.home().joinpath('db.cfg'))) #Creates a path to your db.cfg file +dbset = CONFIG['DB_SETTINGS'] +con = connect(**dbset) + +LOGGER = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +from gcc_puller_functions import (mapserver_name, get_tablename, get_fieldtype, to_time, find_limit, get_geometry) + +def create_table(output_table, return_json, schema_name, con): + """ + Function to create a new table in postgresql for the layer (for audited tables only) + + Parameter + --------- + output_table : string + Table name for postgresql, returned from function get_tablename + + return_json : json + Resulted json response from calling the api, returned from function get_data + + schema_name : string + The schema in which the table will be inserted into + + primary_key : string + Primary key for this layer, returned from dictionary pk_dict + + con: Airflow Connection + Could be the connection to bigdata or to on-prem server + + Returns + -------- + insert_columm : SQL composed + Composed object of column name and types use for creating a new postgresql table + + excluded_column : SQL composed + Composed object that is similar to insert_column, but has 'EXCLUDED.' attached before each column name, used for UPSERT query + """ + + fields = return_json['fields'] + insert_column_list = [sql.Identifier((field['name'].lower()).replace('.', '_')) for field in fields] + insert_column_list.append(sql.Identifier('geom')) + insert_column = sql.SQL(',').join(insert_column_list) + + # Since this is a temporary table, name it '_table' as opposed to 'table' for now + temp_table_name = '_' + output_table + + with con: + with con.cursor() as cur: + + col_list = [sql.Identifier((field['name'].lower()).replace('.', '_')) + sql.SQL(' ') + sql.SQL(get_fieldtype(field["type"])) for field in fields] + col_list.append(sql.Identifier('geom') + sql.SQL(' ') + sql.SQL('geometry')) + col_list_string = sql.SQL(',').join(col_list) + + LOGGER.info(col_list_string.as_string(con)) + create_sql = sql.SQL("CREATE TABLE IF NOT EXISTS {schema_table} ({columns})").format(schema_table = sql.Identifier(schema_name, temp_table_name), + columns = col_list_string) + LOGGER.info(create_sql.as_string(con)) + cur.execute(create_sql) + + return insert_column + +def get_data(mapserver, layer_id, max_number = None, record_max = None): + """ + Function to retreive layer data from GCCView rest api + + Parameters + ----------- + mapserver : string + The name of the mapserver we are accessing, returned from function mapserver_name + + layer_id : integer + Unique layer id that represent a single layer in the mapserver + + max_number : integer + Number for parameter `resultOffset` in the query, indicating the number of rows this query is going to skip + + record_max : integer + Number for parameter `resultRecordCount` in the query, indicating the number of rows this query is going to fetch + + Returns + -------- + return_json : json + Resulted json response from calling the GCCView rest api + """ + return_json = None + base_url = "https://insideto-gis.toronto.ca/arcgis/rest/services/{}/MapServer/{}/query".format(mapserver, layer_id) + + query = {"where":"1=1", + "outFields": "*", + "outSR": '4326', + "returnGeometry": "true", + "returnTrueCurves": "false", + "returnIdsOnly": "false", + "returnCountOnly": "false", + "returnZ": "false", + "returnM": "false", + "orderByFields": "OBJECTID", + "returnDistinctValues": "false", + "returnExtentsOnly": "false", + "resultOffset": "{}".format(max_number), + "resultRecordCount": "{}".format(record_max), + "f":"json"} + + for retry in range(3): + try: + r = requests.get(base_url, params = query, verify = False, timeout = 300) + r.raise_for_status() + except requests.exceptions.HTTPError as err_h: + LOGGER.error("Invalid HTTP response: ", err_h) + except requests.exceptions.ConnectionError as err_c: + LOGGER.error("Network problem: ", err_c) + sleep(10) + except requests.exceptions.Timeout as err_t: + LOGGER.error("Timeout: ", err_t) + except requests.exceptions.RequestException as err: + LOGGER.error("Error: ", err) + else: + return_json = r.json() + break + + return return_json + +def insert_data(output_table, insert_column, return_json, schema_name, con): + """ + Function to insert data to our postgresql database, the data is inserted into a temp table (for audited tables) + + Parameters + ---------- + output_table : string + Table name for postgresql, returned from function get_tablename + + insert_column : SQL composed + Composed object of column name and types use for creating a new postgresql table + + return_json : json + Resulted json response from calling the api, returned from function get_data + + schema_name : string + The schema in which the table will be inserted into + + con: Airflow Connection + Could be the connection to bigdata or to on-prem server + """ + rows = [] + features = return_json['features'] + fields = return_json['fields'] + trials = [[field['name'], field['type']] for field in fields] + + for feature in features: + geom = feature['geometry'] + geometry_type = return_json['geometryType'] + geometry = get_geometry(geometry_type, geom) + + row = [] + for trial in trials: + if trial[1] == 'esriFieldTypeDate' and feature['attributes'][trial[0]] != None: + row.append(to_time(feature['attributes'][trial[0]])) + else: + row.append(feature['attributes'][trial[0]]) + + row.append(geometry) + + rows.append(row) + + # Since this is a temporary table, name it '_table' as opposed to 'table' for now (for audited tables) + temp_table_name = '_' + output_table + + insert=sql.SQL("INSERT INTO {schema_table} ({columns}) VALUES %s").format( + schema_table = sql.Identifier(schema_name, temp_table_name), + columns = insert_column + ) + with con: + with con.cursor() as cur: + execute_values(cur, insert, rows) + LOGGER.info('Successfully inserted %d records into %s', len(rows), output_table) + + +def get_layer(mapserver_n, layer_id, schema_name, con = None): + """ + This function calls to the GCCview rest API and inserts the outputs to the output table in the postgres database. + + Parameters + ---------- + mapserver : int + The name of the mapserver that host the desired layer + + layer_id : int + The id of desired layer + + schema_name : string + The schema in which the table will be inserted into + + con: connection to database + Connection object that can connect to a particular database + Expects a valid con object if using command prompt + """ + successful_task_run = True + + # At this point, there should must be a con now + if con is None: + LOGGER.error("Unable to establish connection to the database, please pass in a valid con") + return + + mapserver = mapserver_name(mapserver_n) + output_table = get_tablename(mapserver, layer_id) + if output_table is None: + LOGGER.error("Invalid mapserver and/or layer Id") + return + #-------------------------------- + keep_adding = True + counter = 0 + + while keep_adding == True: + + if counter == 0: + return_json = get_data(mapserver, layer_id) + insert_column = create_table(output_table, return_json, schema_name, con) + + features = return_json['features'] + record_max=(len(features)) + max_number = record_max + + insert_data(output_table, insert_column, return_json, schema_name, con) + + counter += 1 + keep_adding = find_limit(return_json) + if keep_adding == False: + LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) + else: + return_json = get_data(mapserver, layer_id, max_number = max_number, record_max = record_max) + insert_data(output_table, insert_column, return_json, schema_name, con) + + counter += 1 + keep_adding = find_limit(return_json) + if keep_adding == True: + max_number = max_number + record_max + else: + LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) + +@click.command() +@click.option('--mapserver', '-m', type = int, required = True, + help = 'Mapserver number, e.g. cotgeospatial_2 will be 2') +@click.option('--layer-id', '-l', type = int, required = True + , help = 'Layer id') +@click.option('--schema-name', '-s', type = str, required = True + , help = 'Name of destination schema') +def manual_get_layer(mapserver, layer_id, schema_name, is_audited, con): + """ + This script pulls a GIS layer from GCC servers into the databases of the Data and Analytics Unit. + + Example: + + python gcc_puller_functions.py --mapserver 28 --layer-id 28 + --schema-name gis --con db.cfg + """ + CONFIG.read(con) + dbset = CONFIG['DBSETTINGS'] + connection_obj = connect(**dbset) + # get_layer function + get_layer(mapserver, layer_id, schema_name, con=connection_obj) + +if __name__ == '__main__': + manual_get_layer() From 7b59ee26403e991bd25320f3fd04711c6dfaeff6 Mon Sep 17 00:00:00 2001 From: chmnata Date: Tue, 13 Aug 2024 23:21:22 +0000 Subject: [PATCH 02/14] #1028 update gcc layer puller --- gis/gccview/gcc_layer_puller.py | 185 ++++++++++++++++++++++++++------ 1 file changed, 152 insertions(+), 33 deletions(-) diff --git a/gis/gccview/gcc_layer_puller.py b/gis/gccview/gcc_layer_puller.py index d878ea169..a3dbe9808 100644 --- a/gis/gccview/gcc_layer_puller.py +++ b/gis/gccview/gcc_layer_puller.py @@ -5,20 +5,145 @@ from psycopg2 import sql from psycopg2.extras import execute_values import logging +import os from time import sleep import click from pathlib import Path import configparser -from psycopg2 import connect CONFIG = configparser.ConfigParser() -CONFIG.read(str(Path.home().joinpath('db.cfg'))) #Creates a path to your db.cfg file -dbset = CONFIG['DB_SETTINGS'] -con = connect(**dbset) +from psycopg2 import connect LOGGER = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) -from gcc_puller_functions import (mapserver_name, get_tablename, get_fieldtype, to_time, find_limit, get_geometry) +# Geometry Switcher +def line(geom): + return 'SRID=4326;LineString('+','.join(' '.join(str(x) for x in tup) for tup in geom['paths'][0]) +')' +def polygon(geom): + return 'SRID=4326;MultiPolygon((('+','.join(' '.join(str(x) for x in tup) for tup in geom['rings'][0]) +')))' +def point(geom): + return 'SRID=4326;Point('+(str(geom['x']))+' '+ (str(geom['y']))+')' +def get_geometry(geometry_type, geom): + switcher = { + 'esriGeometryLine':line, + 'esriGeometryPolyline': line, + 'esriGeometryPoint': point, + 'esriGeometryMultiPolygon': polygon, + 'esriGeometryPolygon': polygon + } + func = switcher.get(geometry_type) + geometry = (func(geom)) + return geometry + +def to_time(input): + """ + Convert epoch time to postgresql timestamp without time zone + + Parameters + ----------- + input : string + Epoch time attribute in return_json + + Returns + -------- + time : string + Time in the type of postgresql timestamp without time zone + """ + + time = datetime.datetime.fromtimestamp(abs(input)/1000).strftime('%Y-%m-%d %H:%M:%S') + return time + +def mapserver_name(mapserver_n): + """ + Function to return the mapserver name from integer + + Parameters + ------------ + mapserver_n : numeric + The number of mapserver we will be accessing. 0 for 'cot_geospatial' + + Returns + -------- + mapserver_name : string + The name of the mapserver + """ + + if mapserver_n == 0: + mapserver_name = 'cot_geospatial' + else: + mapserver_name = 'cot_geospatial' + str(mapserver_n) + + return(mapserver_name) + +def get_tablename(mapserver, layer_id): + """ + Function to return the name of the layer + + Parameters + ----------- + mapserver: string + The name of the mapserver we are accessing, returned from function mapserver_name + + layer_id: integer + Unique layer id that represent a single layer in the mapserver + + Returns + -------- + output_name + The table name of the layer in database + """ + + url = 'https://insideto-gis.toronto.ca/arcgis/rest/services/'+mapserver+'/MapServer/layers?f=json' + try: + r = requests.get(url, verify = False, timeout = 20) + r.raise_for_status() + except requests.exceptions.HTTPError as err_h: + LOGGER.error("Invalid HTTP response: ", err_h) + except requests.exceptions.ConnectionError as err_c: + LOGGER.error("Network problem: ", err_c) + except requests.exceptions.Timeout as err_t: + LOGGER.error("Timeout: ", err_t) + except requests.exceptions.RequestException as err: + LOGGER.error("Error: ", err) + else: + ajson = r.json() + layers = ajson['layers'] + for layer in layers: + if layer['id'] == layer_id: + output_name = (layer['name'].lower()).replace(' ', '_') + return output_name + +def find_limit(return_json): + """ + Function to check if last query return all rows + + Parameters + ----------- + return_json : json + Resulted json response from calling the api, returned from function get_data + + Returns + -------- + keep_adding : Boolean + boolean 'keep_adding' indicating if last query returned all rows in the layer + """ + + if return_json.get('exceededTransferLimit', False) == True: + keep_adding = True + else: + keep_adding = False + return keep_adding + +def get_fieldtype(field): + if field == 'esriFieldTypeInteger' or field == 'esriFieldTypeSingle' or field == 'esriFieldTypeInteger' or field=='esriFieldTypeOID' or field == 'esriFieldTypeSmallInteger' or field =='esriFieldGlobalID': + fieldtype = 'integer' + elif field == 'esriFieldTypeString': + fieldtype = 'text' + elif field == 'esriFieldTypeDouble': + fieldtype = 'numeric' + elif field == 'esriFieldTypeDate': + fieldtype = 'timestamp without time zone' + return fieldtype def create_table(output_table, return_json, schema_name, con): """ @@ -55,9 +180,6 @@ def create_table(output_table, return_json, schema_name, con): insert_column_list.append(sql.Identifier('geom')) insert_column = sql.SQL(',').join(insert_column_list) - # Since this is a temporary table, name it '_table' as opposed to 'table' for now - temp_table_name = '_' + output_table - with con: with con.cursor() as cur: @@ -66,7 +188,7 @@ def create_table(output_table, return_json, schema_name, con): col_list_string = sql.SQL(',').join(col_list) LOGGER.info(col_list_string.as_string(con)) - create_sql = sql.SQL("CREATE TABLE IF NOT EXISTS {schema_table} ({columns})").format(schema_table = sql.Identifier(schema_name, temp_table_name), + create_sql = sql.SQL("CREATE TABLE IF NOT EXISTS {schema_table} ({columns})").format(schema_table = sql.Identifier(schema_name, output_table), columns = col_list_string) LOGGER.info(create_sql.as_string(con)) cur.execute(create_sql) @@ -176,11 +298,9 @@ def insert_data(output_table, insert_column, return_json, schema_name, con): rows.append(row) - # Since this is a temporary table, name it '_table' as opposed to 'table' for now (for audited tables) - temp_table_name = '_' + output_table insert=sql.SQL("INSERT INTO {schema_table} ({columns}) VALUES %s").format( - schema_table = sql.Identifier(schema_name, temp_table_name), + schema_table = sql.Identifier(schema_name, output_table), columns = insert_column ) with con: @@ -231,34 +351,33 @@ def get_layer(mapserver_n, layer_id, schema_name, con = None): insert_column = create_table(output_table, return_json, schema_name, con) features = return_json['features'] - record_max=(len(features)) + record_max=len(features) max_number = record_max - - insert_data(output_table, insert_column, return_json, schema_name, con) - - counter += 1 - keep_adding = find_limit(return_json) - if keep_adding == False: - LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) else: return_json = get_data(mapserver, layer_id, max_number = max_number, record_max = record_max) - insert_data(output_table, insert_column, return_json, schema_name, con) - - counter += 1 - keep_adding = find_limit(return_json) - if keep_adding == True: - max_number = max_number + record_max - else: - LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) + + # Insert data into the table + insert_data(output_table, insert_column, return_json, schema_name, con) + + # Update loop variables + counter += 1 + keep_adding = find_limit(return_json) + + if keep_adding: + max_number += record_max + else: + LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) @click.command() @click.option('--mapserver', '-m', type = int, required = True, help = 'Mapserver number, e.g. cotgeospatial_2 will be 2') -@click.option('--layer-id', '-l', type = int, required = True - , help = 'Layer id') -@click.option('--schema-name', '-s', type = str, required = True - , help = 'Name of destination schema') -def manual_get_layer(mapserver, layer_id, schema_name, is_audited, con): +@click.option('--layer-id', '-l', type = int, required = True, + help = 'Layer id') +@click.option('--schema-name', '-s', type = str, required = True, + help = 'Name of destination schema') +@click.option('--con', '-c', type = str, default=os.path.expanduser('~/db.cfg'), + help = 'The path to the credential config file. Default is ~/db.cfg') +def manual_get_layer(mapserver, layer_id, schema_name, con): """ This script pulls a GIS layer from GCC servers into the databases of the Data and Analytics Unit. From 63f987c3e6df35ba7c787f94aa63963f4875f1f7 Mon Sep 17 00:00:00 2001 From: chmnata Date: Wed, 21 Aug 2024 19:55:31 +0000 Subject: [PATCH 03/14] #1028 update readme --- gis/gccview/README.md | 58 ++++++++++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 23 deletions(-) diff --git a/gis/gccview/README.md b/gis/gccview/README.md index 3e8b1f9f4..13738c7de 100644 --- a/gis/gccview/README.md +++ b/gis/gccview/README.md @@ -1,12 +1,9 @@ # GCCVIEW pipeline * [Overview](#overview) * [Where the layers are pulled](#where-the-layers-are-pulled) - * [How the script works](#how-the-script-works) + * [Data Pipeline](#data-pipeline) * [Adding new layers to GCC Puller DAG](#adding-new-layers-to-gcc-puller-dag) - * [Manually fetch layers - Using Jupyter Notebook](#manually-fetch-layers---using-jupyter-notebook) - * [Manually fetch layers - Using Click in command prompt](#manually-fetch-layers---using-click-in-command-prompt) - - + * [Manually fetch layers](#manually-fetch-layers) ## Overview @@ -80,7 +77,7 @@ The GCC pipeline will be pulling multiple layers into the `gis_core` and `gis` s |school|28|17| |library|28|28| -## How the script works +## Data Pipeline The pipeline consists of two files, `gcc_puller_functions.py` for the functions and `gcc_layers_pull.py` for the Airflow DAG. The main function that fetches the layers is called `get_layer` and it takes in five parameters. Here is a list that describes what each parameter means: @@ -97,29 +94,44 @@ In the DAG file, the arguments for each layer are stored in dictionaries called 2. Add a new entry to "bigdata_layers" or "ptc_layers" dictionaries in [gcc_layers_pull.py](/dags/gcc_layers_pull.py) depending on the destination database. 3. If is_audited = True, you must also add a primary key for the new layer to "pk_dict" in [gcc_puller_functions.py](gcc_puller_functions.py). -## Manually fetch layers - Using Jupyter Notebook +## Manually fetch layers + +If you need to pull a layer as a one-off task, this script allows you to pull any layer from the GCC Rest API. Please note that the script must be run locally or on a on-prem server as it needs connection to insideto. + +Before running the script, ensure that you have set up the appropriate environment with all necessary packages installed. You might have to set the `https_proxy` in your environment with your novell username and password in order to clone this repo or install packages. If you run into any issues, don't hestitate to ask a sysadmin. You can then install all packages in the `requirement.txt`, either with: +1) Activate your virtual environment, it should automatically install them for you + + Pipenv: + + `pipenv shell` + + `pipenv install` + + Venv: + + `source .venv/bin/activate` + + `python3 -m pip install -r requirements.txt` +2) Install packages with pip if you are not using a virtual environment (you should) + + `pip install -r requirements.txt` + -One option is to use [this notebook](./gcc_puller.ipynb) on Morbius server environment to fetch layer from gccview rest api and send it to postgresql in the schema you want. +Now you are set to run the script! -To use the Jupyter notebook: -1. Know the name of the layer you want to fetch. -2. Look for the mapserver that host the layer, and the layer id using the tables above. -3. Determine the schema of where you want the downloaded table to be. -4. Enter the .cfg file path at the 'Config' code block. -5. Enter the variables using the pre-existing template code block provided at the end of the notebook file. -6. Execute the code blocks from top to bottom. -7. Open pgAdmin, go to the specified schema and check if the layer's information had been pulled correctly. +There are 4 inputs that need to be entered. -Note that if you want to pull a partitioned child table into your personal schema, you need to set up the parent table first. Refer to the .sql files in `/gis/gccview/sql`. +`--mapserver`: Mapserver number, e.g. cotgeospatial_2 will be 2 -## Manually fetch layers - Using Click in command prompt +`--layer-id`: Layer id -The second option is to execute `gcc_puller_functions.py` in command prompt (or venv in Morbius or other servers). +`--schema-name`: Name of destination schema -There are 5 inputs that need to be entered, which are very similar to the ones listed above for function `get_layer`. The only difference is that the last parameter now needs to be a string that contains the path to your .cfg file. +`--con`(optional): The path to the credential config file. Default is ~/db.cfg -Run the following line to see the details of how to enter each parameter with Click. +Example of pulling the library layer to the gis schema. -```python3 {FULL_PATH_TO_gcc_puller_functions.py} --help``` -Note that if the script doesn't work, one reason might be because your credentials don't have access to the GCC API. +```python +python gcc_layer_puller.py --mapserver 28 --layer-id 28 --schema-name gis --con db.cfg +``` From 8b3efad6e0c3bbd51b9170318ad8a956fd5e1c13 Mon Sep 17 00:00:00 2001 From: chmnata Date: Wed, 21 Aug 2024 19:56:31 +0000 Subject: [PATCH 04/14] #1028 add requirement.txt --- gis/gccview/requirements.txt | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 gis/gccview/requirements.txt diff --git a/gis/gccview/requirements.txt b/gis/gccview/requirements.txt new file mode 100644 index 000000000..12ec9e662 --- /dev/null +++ b/gis/gccview/requirements.txt @@ -0,0 +1,8 @@ +certifi==2024.7.4 +charset-normalizer==3.3.2 +click==8.1.7 +idna==3.7 +psycopg2-binary==2.9.9 +requests==2.32.3 +typing-extensions==4.12.2 +urllib3==2.2.2 \ No newline at end of file From e8ee420ebd9a636ad7d83a9c4bc08717051d1dea Mon Sep 17 00:00:00 2001 From: chmnata Date: Fri, 23 Aug 2024 16:01:47 +0000 Subject: [PATCH 05/14] #1028 update readme --- gis/gccview/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gis/gccview/README.md b/gis/gccview/README.md index 13738c7de..8aa453bb3 100644 --- a/gis/gccview/README.md +++ b/gis/gccview/README.md @@ -79,7 +79,7 @@ The GCC pipeline will be pulling multiple layers into the `gis_core` and `gis` s ## Data Pipeline -The pipeline consists of two files, `gcc_puller_functions.py` for the functions and `gcc_layers_pull.py` for the Airflow DAG. The main function that fetches the layers is called `get_layer` and it takes in five parameters. Here is a list that describes what each parameter means: +The pipeline consists of two files, `gcc_puller_functions.py` for the functions and `/dags/gcc_layers_pull.py` for the Airflow DAG. The main function that fetches the layers is called `get_layer` and it takes in five parameters. Here is a list that describes what each parameter means: - mapserver_n (int): ID of the mapserver that host the desired layer - layer_id (int): ID of the layer within the particular mapserver @@ -91,8 +91,8 @@ In the DAG file, the arguments for each layer are stored in dictionaries called ## Adding new layers to GCC Puller DAG 1. Identify the mapserver_n and layer_id for the layer you wish to add. You can find COT transportation layers here: https://insideto-gis.toronto.ca/arcgis/rest/services/cot_geospatial2/FeatureServer, where mapserver_n is 2 and the layer_id is in brackets after the layer name. -2. Add a new entry to "bigdata_layers" or "ptc_layers" dictionaries in [gcc_layers_pull.py](/dags/gcc_layers_pull.py) depending on the destination database. -3. If is_audited = True, you must also add a primary key for the new layer to "pk_dict" in [gcc_puller_functions.py](gcc_puller_functions.py). +2. Add a new entry to "bigdata_layers" or "ptc_layers" dictionaries in airflow's variable depending on the destination database. +3. If is_audited = True, you must also add a primary key for the new layer to "pk_dict" in the corresponding airflow variable. ## Manually fetch layers From f6c6898fe2b038516d5849897c276816f53b7382 Mon Sep 17 00:00:00 2001 From: chmnata Date: Fri, 23 Aug 2024 16:02:08 +0000 Subject: [PATCH 06/14] #1028 reimport functions --- gis/gccview/gcc_layer_puller.py | 143 +++----------------------------- 1 file changed, 12 insertions(+), 131 deletions(-) diff --git a/gis/gccview/gcc_layer_puller.py b/gis/gccview/gcc_layer_puller.py index a3dbe9808..32c85f285 100644 --- a/gis/gccview/gcc_layer_puller.py +++ b/gis/gccview/gcc_layer_puller.py @@ -16,134 +16,16 @@ LOGGER = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) -# Geometry Switcher -def line(geom): - return 'SRID=4326;LineString('+','.join(' '.join(str(x) for x in tup) for tup in geom['paths'][0]) +')' -def polygon(geom): - return 'SRID=4326;MultiPolygon((('+','.join(' '.join(str(x) for x in tup) for tup in geom['rings'][0]) +')))' -def point(geom): - return 'SRID=4326;Point('+(str(geom['x']))+' '+ (str(geom['y']))+')' -def get_geometry(geometry_type, geom): - switcher = { - 'esriGeometryLine':line, - 'esriGeometryPolyline': line, - 'esriGeometryPoint': point, - 'esriGeometryMultiPolygon': polygon, - 'esriGeometryPolygon': polygon - } - func = switcher.get(geometry_type) - geometry = (func(geom)) - return geometry - -def to_time(input): - """ - Convert epoch time to postgresql timestamp without time zone - - Parameters - ----------- - input : string - Epoch time attribute in return_json - - Returns - -------- - time : string - Time in the type of postgresql timestamp without time zone - """ - - time = datetime.datetime.fromtimestamp(abs(input)/1000).strftime('%Y-%m-%d %H:%M:%S') - return time - -def mapserver_name(mapserver_n): - """ - Function to return the mapserver name from integer - - Parameters - ------------ - mapserver_n : numeric - The number of mapserver we will be accessing. 0 for 'cot_geospatial' - - Returns - -------- - mapserver_name : string - The name of the mapserver - """ - - if mapserver_n == 0: - mapserver_name = 'cot_geospatial' - else: - mapserver_name = 'cot_geospatial' + str(mapserver_n) - - return(mapserver_name) - -def get_tablename(mapserver, layer_id): - """ - Function to return the name of the layer - - Parameters - ----------- - mapserver: string - The name of the mapserver we are accessing, returned from function mapserver_name - - layer_id: integer - Unique layer id that represent a single layer in the mapserver - - Returns - -------- - output_name - The table name of the layer in database - """ - - url = 'https://insideto-gis.toronto.ca/arcgis/rest/services/'+mapserver+'/MapServer/layers?f=json' - try: - r = requests.get(url, verify = False, timeout = 20) - r.raise_for_status() - except requests.exceptions.HTTPError as err_h: - LOGGER.error("Invalid HTTP response: ", err_h) - except requests.exceptions.ConnectionError as err_c: - LOGGER.error("Network problem: ", err_c) - except requests.exceptions.Timeout as err_t: - LOGGER.error("Timeout: ", err_t) - except requests.exceptions.RequestException as err: - LOGGER.error("Error: ", err) - else: - ajson = r.json() - layers = ajson['layers'] - for layer in layers: - if layer['id'] == layer_id: - output_name = (layer['name'].lower()).replace(' ', '_') - return output_name - -def find_limit(return_json): - """ - Function to check if last query return all rows - - Parameters - ----------- - return_json : json - Resulted json response from calling the api, returned from function get_data - - Returns - -------- - keep_adding : Boolean - boolean 'keep_adding' indicating if last query returned all rows in the layer - """ - - if return_json.get('exceededTransferLimit', False) == True: - keep_adding = True - else: - keep_adding = False - return keep_adding - -def get_fieldtype(field): - if field == 'esriFieldTypeInteger' or field == 'esriFieldTypeSingle' or field == 'esriFieldTypeInteger' or field=='esriFieldTypeOID' or field == 'esriFieldTypeSmallInteger' or field =='esriFieldGlobalID': - fieldtype = 'integer' - elif field == 'esriFieldTypeString': - fieldtype = 'text' - elif field == 'esriFieldTypeDouble': - fieldtype = 'numeric' - elif field == 'esriFieldTypeDate': - fieldtype = 'timestamp without time zone' - return fieldtype +# Import functions +try: + from gcc_puller_function import ( get_geometry, + to_time, + mapserver_name, + get_tablename, + find_limit, + get_fieldtype) +except: + raise ImportError("Cannot import puller helper functions.") def create_table(output_table, return_json, schema_name, con): """ @@ -163,7 +45,7 @@ def create_table(output_table, return_json, schema_name, con): primary_key : string Primary key for this layer, returned from dictionary pk_dict - con: Airflow Connection + con: Connection Could be the connection to bigdata or to on-prem server Returns @@ -383,8 +265,7 @@ def manual_get_layer(mapserver, layer_id, schema_name, con): Example: - python gcc_puller_functions.py --mapserver 28 --layer-id 28 - --schema-name gis --con db.cfg + python gcc_layer_puller.py --mapserver 28 --layer-id 28 --schema-name gis --con db.cfg """ CONFIG.read(con) dbset = CONFIG['DBSETTINGS'] From 34f3db054f1b7ba7afdacaa0efebe386a8dd3924 Mon Sep 17 00:00:00 2001 From: chmnata Date: Fri, 23 Aug 2024 16:02:32 +0000 Subject: [PATCH 07/14] #1028 remove unneccessary packages --- gis/gccview/gcc_puller_functions.py | 48 ++++++++++------------------- 1 file changed, 16 insertions(+), 32 deletions(-) diff --git a/gis/gccview/gcc_puller_functions.py b/gis/gccview/gcc_puller_functions.py index 38d1698f7..83480a858 100644 --- a/gis/gccview/gcc_puller_functions.py +++ b/gis/gccview/gcc_puller_functions.py @@ -6,8 +6,6 @@ from psycopg2 import sql from psycopg2.extras import execute_values import logging -from time import sleep -from airflow.exceptions import AirflowFailException import click CONFIG = configparser.ConfigParser() @@ -311,7 +309,6 @@ def get_data(mapserver, layer_id, max_number = None, record_max = None): LOGGER.error("Invalid HTTP response: ", err_h) except requests.exceptions.ConnectionError as err_c: LOGGER.error("Network problem: ", err_c) - sleep(10) except requests.exceptions.Timeout as err_t: LOGGER.error("Timeout: ", err_t) except requests.exceptions.RequestException as err: @@ -613,38 +610,25 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = features = return_json['features'] record_max=(len(features)) max_number = record_max - - if is_audited: - insert_audited_data(output_table, insert_column, return_json, schema_name, con) - else: - insert_partitioned_data(schema_parent_table_insert, insert_column, return_json, schema_name, con) - - counter += 1 - keep_adding = find_limit(return_json) - if keep_adding == False: - LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) + # Insert data into the table + if is_audited: + insert_audited_data(output_table, insert_column, return_json, schema_name, con) else: - return_json = get_data(mapserver, layer_id, max_number = max_number, record_max = record_max) - if is_audited: - insert_audited_data(output_table, insert_column, return_json, schema_name, con) - else: - insert_partitioned_data(schema_parent_table_insert, insert_column, return_json, schema_name, con) - - counter += 1 - keep_adding = find_limit(return_json) - if keep_adding == True: - max_number = max_number + record_max - else: - LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) + insert_partitioned_data(schema_parent_table_insert, insert_column, return_json, schema_name, con) + # Update loop variables + counter += 1 + keep_adding = find_limit(return_json) + + if keep_adding: + max_number += record_max + else: + LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) if is_audited: - successful_task_run = update_table(output_table, insert_column, excluded_column, primary_key, schema_name, con) - - ''' - # Raise error if UPSERT failed - if not successful_task_run: - raise AirflowFailException - ''' + try: + successful_task_run = update_table(output_table, insert_column, excluded_column, primary_key, schema_name, con) + except Exception as err: + LOGGER.exception("Unable to update table %s", err) @click.command() @click.option('--mapserver', '-ms', type = int, required = True, From 0f39ccb22ec7b2ecaad5e2295e44b1de02e8e398 Mon Sep 17 00:00:00 2001 From: chmnata Date: Fri, 23 Aug 2024 18:21:02 +0000 Subject: [PATCH 08/14] #1044 update readme to add new layer --- gis/gccview/README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gis/gccview/README.md b/gis/gccview/README.md index 8aa453bb3..6bc861e71 100644 --- a/gis/gccview/README.md +++ b/gis/gccview/README.md @@ -76,6 +76,7 @@ The GCC pipeline will be pulling multiple layers into the `gis_core` and `gis` s |private_road|27|13| |school|28|17| |library|28|28| +|pavement_asset|2|36| ## Data Pipeline @@ -92,7 +93,7 @@ In the DAG file, the arguments for each layer are stored in dictionaries called ## Adding new layers to GCC Puller DAG 1. Identify the mapserver_n and layer_id for the layer you wish to add. You can find COT transportation layers here: https://insideto-gis.toronto.ca/arcgis/rest/services/cot_geospatial2/FeatureServer, where mapserver_n is 2 and the layer_id is in brackets after the layer name. 2. Add a new entry to "bigdata_layers" or "ptc_layers" dictionaries in airflow's variable depending on the destination database. -3. If is_audited = True, you must also add a primary key for the new layer to "pk_dict" in the corresponding airflow variable. +3. If is_audited = True, you must also add a primary key for the new layer to "gcc_layers" in the corresponding airflow variable. ## Manually fetch layers From 6a403a1fe06813a5730705916d5ded80302788f5 Mon Sep 17 00:00:00 2001 From: Leo Xu Date: Tue, 1 Oct 2024 18:35:08 +0000 Subject: [PATCH 09/14] #1028 Deduplicate two files for gcc generic puller, combined into one, now can run gcc.puller_functions.py directly to pull a layer --- gis/gccview/gcc_layer_puller.py | 277 ---------------------------- gis/gccview/gcc_puller_functions.py | 199 ++++++++++++++++---- 2 files changed, 163 insertions(+), 313 deletions(-) delete mode 100644 gis/gccview/gcc_layer_puller.py diff --git a/gis/gccview/gcc_layer_puller.py b/gis/gccview/gcc_layer_puller.py deleted file mode 100644 index 32c85f285..000000000 --- a/gis/gccview/gcc_layer_puller.py +++ /dev/null @@ -1,277 +0,0 @@ -import configparser -import requests -import datetime -from psycopg2 import connect -from psycopg2 import sql -from psycopg2.extras import execute_values -import logging -import os -from time import sleep -import click -from pathlib import Path -import configparser -CONFIG = configparser.ConfigParser() -from psycopg2 import connect - -LOGGER = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO) - -# Import functions -try: - from gcc_puller_function import ( get_geometry, - to_time, - mapserver_name, - get_tablename, - find_limit, - get_fieldtype) -except: - raise ImportError("Cannot import puller helper functions.") - -def create_table(output_table, return_json, schema_name, con): - """ - Function to create a new table in postgresql for the layer (for audited tables only) - - Parameter - --------- - output_table : string - Table name for postgresql, returned from function get_tablename - - return_json : json - Resulted json response from calling the api, returned from function get_data - - schema_name : string - The schema in which the table will be inserted into - - primary_key : string - Primary key for this layer, returned from dictionary pk_dict - - con: Connection - Could be the connection to bigdata or to on-prem server - - Returns - -------- - insert_columm : SQL composed - Composed object of column name and types use for creating a new postgresql table - - excluded_column : SQL composed - Composed object that is similar to insert_column, but has 'EXCLUDED.' attached before each column name, used for UPSERT query - """ - - fields = return_json['fields'] - insert_column_list = [sql.Identifier((field['name'].lower()).replace('.', '_')) for field in fields] - insert_column_list.append(sql.Identifier('geom')) - insert_column = sql.SQL(',').join(insert_column_list) - - with con: - with con.cursor() as cur: - - col_list = [sql.Identifier((field['name'].lower()).replace('.', '_')) + sql.SQL(' ') + sql.SQL(get_fieldtype(field["type"])) for field in fields] - col_list.append(sql.Identifier('geom') + sql.SQL(' ') + sql.SQL('geometry')) - col_list_string = sql.SQL(',').join(col_list) - - LOGGER.info(col_list_string.as_string(con)) - create_sql = sql.SQL("CREATE TABLE IF NOT EXISTS {schema_table} ({columns})").format(schema_table = sql.Identifier(schema_name, output_table), - columns = col_list_string) - LOGGER.info(create_sql.as_string(con)) - cur.execute(create_sql) - - return insert_column - -def get_data(mapserver, layer_id, max_number = None, record_max = None): - """ - Function to retreive layer data from GCCView rest api - - Parameters - ----------- - mapserver : string - The name of the mapserver we are accessing, returned from function mapserver_name - - layer_id : integer - Unique layer id that represent a single layer in the mapserver - - max_number : integer - Number for parameter `resultOffset` in the query, indicating the number of rows this query is going to skip - - record_max : integer - Number for parameter `resultRecordCount` in the query, indicating the number of rows this query is going to fetch - - Returns - -------- - return_json : json - Resulted json response from calling the GCCView rest api - """ - return_json = None - base_url = "https://insideto-gis.toronto.ca/arcgis/rest/services/{}/MapServer/{}/query".format(mapserver, layer_id) - - query = {"where":"1=1", - "outFields": "*", - "outSR": '4326', - "returnGeometry": "true", - "returnTrueCurves": "false", - "returnIdsOnly": "false", - "returnCountOnly": "false", - "returnZ": "false", - "returnM": "false", - "orderByFields": "OBJECTID", - "returnDistinctValues": "false", - "returnExtentsOnly": "false", - "resultOffset": "{}".format(max_number), - "resultRecordCount": "{}".format(record_max), - "f":"json"} - - for retry in range(3): - try: - r = requests.get(base_url, params = query, verify = False, timeout = 300) - r.raise_for_status() - except requests.exceptions.HTTPError as err_h: - LOGGER.error("Invalid HTTP response: ", err_h) - except requests.exceptions.ConnectionError as err_c: - LOGGER.error("Network problem: ", err_c) - sleep(10) - except requests.exceptions.Timeout as err_t: - LOGGER.error("Timeout: ", err_t) - except requests.exceptions.RequestException as err: - LOGGER.error("Error: ", err) - else: - return_json = r.json() - break - - return return_json - -def insert_data(output_table, insert_column, return_json, schema_name, con): - """ - Function to insert data to our postgresql database, the data is inserted into a temp table (for audited tables) - - Parameters - ---------- - output_table : string - Table name for postgresql, returned from function get_tablename - - insert_column : SQL composed - Composed object of column name and types use for creating a new postgresql table - - return_json : json - Resulted json response from calling the api, returned from function get_data - - schema_name : string - The schema in which the table will be inserted into - - con: Airflow Connection - Could be the connection to bigdata or to on-prem server - """ - rows = [] - features = return_json['features'] - fields = return_json['fields'] - trials = [[field['name'], field['type']] for field in fields] - - for feature in features: - geom = feature['geometry'] - geometry_type = return_json['geometryType'] - geometry = get_geometry(geometry_type, geom) - - row = [] - for trial in trials: - if trial[1] == 'esriFieldTypeDate' and feature['attributes'][trial[0]] != None: - row.append(to_time(feature['attributes'][trial[0]])) - else: - row.append(feature['attributes'][trial[0]]) - - row.append(geometry) - - rows.append(row) - - - insert=sql.SQL("INSERT INTO {schema_table} ({columns}) VALUES %s").format( - schema_table = sql.Identifier(schema_name, output_table), - columns = insert_column - ) - with con: - with con.cursor() as cur: - execute_values(cur, insert, rows) - LOGGER.info('Successfully inserted %d records into %s', len(rows), output_table) - - -def get_layer(mapserver_n, layer_id, schema_name, con = None): - """ - This function calls to the GCCview rest API and inserts the outputs to the output table in the postgres database. - - Parameters - ---------- - mapserver : int - The name of the mapserver that host the desired layer - - layer_id : int - The id of desired layer - - schema_name : string - The schema in which the table will be inserted into - - con: connection to database - Connection object that can connect to a particular database - Expects a valid con object if using command prompt - """ - successful_task_run = True - - # At this point, there should must be a con now - if con is None: - LOGGER.error("Unable to establish connection to the database, please pass in a valid con") - return - - mapserver = mapserver_name(mapserver_n) - output_table = get_tablename(mapserver, layer_id) - if output_table is None: - LOGGER.error("Invalid mapserver and/or layer Id") - return - #-------------------------------- - keep_adding = True - counter = 0 - - while keep_adding == True: - - if counter == 0: - return_json = get_data(mapserver, layer_id) - insert_column = create_table(output_table, return_json, schema_name, con) - - features = return_json['features'] - record_max=len(features) - max_number = record_max - else: - return_json = get_data(mapserver, layer_id, max_number = max_number, record_max = record_max) - - # Insert data into the table - insert_data(output_table, insert_column, return_json, schema_name, con) - - # Update loop variables - counter += 1 - keep_adding = find_limit(return_json) - - if keep_adding: - max_number += record_max - else: - LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) - -@click.command() -@click.option('--mapserver', '-m', type = int, required = True, - help = 'Mapserver number, e.g. cotgeospatial_2 will be 2') -@click.option('--layer-id', '-l', type = int, required = True, - help = 'Layer id') -@click.option('--schema-name', '-s', type = str, required = True, - help = 'Name of destination schema') -@click.option('--con', '-c', type = str, default=os.path.expanduser('~/db.cfg'), - help = 'The path to the credential config file. Default is ~/db.cfg') -def manual_get_layer(mapserver, layer_id, schema_name, con): - """ - This script pulls a GIS layer from GCC servers into the databases of the Data and Analytics Unit. - - Example: - - python gcc_layer_puller.py --mapserver 28 --layer-id 28 --schema-name gis --con db.cfg - """ - CONFIG.read(con) - dbset = CONFIG['DBSETTINGS'] - connection_obj = connect(**dbset) - # get_layer function - get_layer(mapserver, layer_id, schema_name, con=connection_obj) - -if __name__ == '__main__': - manual_get_layer() diff --git a/gis/gccview/gcc_puller_functions.py b/gis/gccview/gcc_puller_functions.py index 83480a858..1461d2422 100644 --- a/gis/gccview/gcc_puller_functions.py +++ b/gis/gccview/gcc_puller_functions.py @@ -204,6 +204,56 @@ def create_partitioned_table(output_table, return_json, schema_name, con): return insert_column, output_table +def create_table(output_table, return_json, schema_name, con): + """ + Function to create a new table in postgresql for the layer (for audited tables only) + + Parameter + --------- + output_table : string + Table name for postgresql, returned from function get_tablename + + return_json : json + Resulted json response from calling the api, returned from function get_data + + schema_name : string + The schema in which the table will be inserted into + + primary_key : string + Primary key for this layer, returned from dictionary pk_dict + + con: Connection + Could be the connection to bigdata or to on-prem server + + Returns + -------- + insert_columm : SQL composed + Composed object of column name and types use for creating a new postgresql table + + excluded_column : SQL composed + Composed object that is similar to insert_column, but has 'EXCLUDED.' attached before each column name, used for UPSERT query + """ + + fields = return_json['fields'] + insert_column_list = [sql.Identifier((field['name'].lower()).replace('.', '_')) for field in fields] + insert_column_list.append(sql.Identifier('geom')) + insert_column = sql.SQL(',').join(insert_column_list) + + with con: + with con.cursor() as cur: + + col_list = [sql.Identifier((field['name'].lower()).replace('.', '_')) + sql.SQL(' ') + sql.SQL(get_fieldtype(field["type"])) for field in fields] + col_list.append(sql.Identifier('geom') + sql.SQL(' ') + sql.SQL('geometry')) + col_list_string = sql.SQL(',').join(col_list) + + LOGGER.info(col_list_string.as_string(con)) + create_sql = sql.SQL("CREATE TABLE IF NOT EXISTS {schema_table} ({columns})").format(schema_table = sql.Identifier(schema_name, output_table), + columns = col_list_string) + LOGGER.info(create_sql.as_string(con)) + cur.execute(create_sql) + + return insert_column + # Geometry Switcher def line(geom): return 'SRID=4326;LineString('+','.join(' '.join(str(x) for x in tup) for tup in geom['paths'][0]) +')' @@ -448,6 +498,58 @@ def insert_partitioned_data(schema_parent_table_insert, insert_column, return_js with con.cursor() as cur: execute_values(cur, insert, rows) LOGGER.info('Successfully inserted %d records into %s', len(rows), schema_parent_table_insert) + +def insert_data(output_table, insert_column, return_json, schema_name, con): + """ + Function to insert data to our postgresql database, the data is inserted into a temp table (for audited tables) + + Parameters + ---------- + output_table : string + Table name for postgresql, returned from function get_tablename + + insert_column : SQL composed + Composed object of column name and types use for creating a new postgresql table + + return_json : json + Resulted json response from calling the api, returned from function get_data + + schema_name : string + The schema in which the table will be inserted into + + con: Airflow Connection + Could be the connection to bigdata or to on-prem server + """ + rows = [] + features = return_json['features'] + fields = return_json['fields'] + trials = [[field['name'], field['type']] for field in fields] + + for feature in features: + geom = feature['geometry'] + geometry_type = return_json['geometryType'] + geometry = get_geometry(geometry_type, geom) + + row = [] + for trial in trials: + if trial[1] == 'esriFieldTypeDate' and feature['attributes'][trial[0]] != None: + row.append(to_time(feature['attributes'][trial[0]])) + else: + row.append(feature['attributes'][trial[0]]) + + row.append(geometry) + + rows.append(row) + + + insert=sql.SQL("INSERT INTO {schema_table} ({columns}) VALUES %s").format( + schema_table = sql.Identifier(schema_name, output_table), + columns = insert_column + ) + with con: + with con.cursor() as cur: + execute_values(cur, insert, rows) + LOGGER.info('Successfully inserted %d records into %s', len(rows), output_table) def update_table(output_table, insert_column, excluded_column, primary_key, schema_name, con): """ @@ -547,7 +649,7 @@ def update_table(output_table, insert_column, excluded_column, primary_key, sche return successful_execution #------------------------------------------------------------------------------------------------------- # base main function, also compatible with Airflow -def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None, primary_key = None): +def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None, primary_key = None, is_manual = False): """ This function calls to the GCCview rest API and inserts the outputs to the output table in the postgres database. @@ -573,8 +675,7 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = Connection object that can connect to a particular database Expects a valid con object if using command prompt """ - successful_task_run = True - + # For Airflow DAG if cred is not None: con = cred.get_conn() @@ -590,46 +691,71 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = LOGGER.error("Invalid mapserver and/or layer Id") return #-------------------------------- - if is_audited and primary_key is None: - LOGGER.error("Audited tables should have a primary key.") - if not(is_audited) and primary_key is not None: - LOGGER.error("Non-audited tables do not use the primary key.") - #-------------------------------- keep_adding = True counter = 0 - - while keep_adding == True: + #-------------------------------- + if is_manual: + while keep_adding == True: + + if counter == 0: + return_json = get_data(mapserver, layer_id) + insert_column = create_table(output_table, return_json, schema_name, con) + + features = return_json['features'] + record_max=len(features) + max_number = record_max + else: + return_json = get_data(mapserver, layer_id, max_number = max_number, record_max = record_max) + + # Insert data into the table + insert_data(output_table, insert_column, return_json, schema_name, con) - if counter == 0: + # Update loop variables + counter += 1 + keep_adding = find_limit(return_json) + + if keep_adding: + max_number += record_max + else: + LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) + else: + if is_audited and primary_key is None: + LOGGER.error("Audited tables should have a primary key.") + if not(is_audited) and primary_key is not None: + LOGGER.error("Non-audited tables do not use the primary key.") + #-------------------------------- + while keep_adding == True: return_json = get_data(mapserver, layer_id) + + if counter == 0: + if is_audited: + (insert_column, excluded_column) = create_audited_table(output_table, return_json, schema_name, primary_key, con) + else: + (insert_column, schema_parent_table_insert) = create_partitioned_table(output_table, return_json, schema_name, con) + + features = return_json['features'] + record_max=(len(features)) + max_number = record_max + # Insert data into the table if is_audited: - (insert_column, excluded_column) = create_audited_table(output_table, return_json, schema_name, primary_key, con) + insert_audited_data(output_table, insert_column, return_json, schema_name, con) else: - (insert_column, schema_parent_table_insert) = create_partitioned_table(output_table, return_json, schema_name, con) - - features = return_json['features'] - record_max=(len(features)) - max_number = record_max - # Insert data into the table - if is_audited: - insert_audited_data(output_table, insert_column, return_json, schema_name, con) - else: - insert_partitioned_data(schema_parent_table_insert, insert_column, return_json, schema_name, con) - # Update loop variables - counter += 1 - keep_adding = find_limit(return_json) + insert_partitioned_data(schema_parent_table_insert, insert_column, return_json, schema_name, con) + # Update loop variables + counter += 1 + keep_adding = find_limit(return_json) - if keep_adding: - max_number += record_max - else: - LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) + if keep_adding: + max_number += record_max + else: + LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) - if is_audited: - try: - successful_task_run = update_table(output_table, insert_column, excluded_column, primary_key, schema_name, con) - except Exception as err: - LOGGER.exception("Unable to update table %s", err) - + if is_audited: + try: + update_table(output_table, insert_column, excluded_column, primary_key, schema_name, con) + except Exception as err: + LOGGER.exception("Unable to update table %s", err) + @click.command() @click.option('--mapserver', '-ms', type = int, required = True, help = 'Mapserver number, e.g. cotgeospatial_2 will be 2') @@ -663,7 +789,8 @@ def manual_get_layer(mapserver, layer_id, schema_name, is_audited, primary_key, schema_name = schema_name, is_audited = is_audited, primary_key = primary_key, - con=connection_obj + con=connection_obj, + is_manual = True ) if __name__ == '__main__': From 37b8b8f685097237bc9cfe9e00557543b20d60f3 Mon Sep 17 00:00:00 2001 From: Leo Xu Date: Tue, 1 Oct 2024 19:41:52 +0000 Subject: [PATCH 10/14] #1028 Bug fixes for Airflow part of get_layer function for huge layer --- gis/gccview/gcc_puller_functions.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/gis/gccview/gcc_puller_functions.py b/gis/gccview/gcc_puller_functions.py index 1461d2422..3a2142364 100644 --- a/gis/gccview/gcc_puller_functions.py +++ b/gis/gccview/gcc_puller_functions.py @@ -725,9 +725,10 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = LOGGER.error("Non-audited tables do not use the primary key.") #-------------------------------- while keep_adding == True: - return_json = get_data(mapserver, layer_id) - if counter == 0: + # Without specifying the max_number and record_max, the number of data we get from this query + # should be the max record count of the layer if the layer is huge + return_json = get_data(mapserver, layer_id) if is_audited: (insert_column, excluded_column) = create_audited_table(output_table, return_json, schema_name, primary_key, con) else: @@ -736,7 +737,10 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = features = return_json['features'] record_max=(len(features)) max_number = record_max - # Insert data into the table + else: + return_json = get_data(mapserver, layer_id, max_number = max_number, record_max = record_max) + + # Insert data into the table if is_audited: insert_audited_data(output_table, insert_column, return_json, schema_name, con) else: From c89aab5741f7ee3b0c0b4ad5c0267ad32c6506c9 Mon Sep 17 00:00:00 2001 From: Leo Xu Date: Thu, 3 Oct 2024 17:13:09 +0000 Subject: [PATCH 11/14] #1028 Deduplicate get_layer function and combine three insert_data functions into one --- gis/gccview/gcc_puller_functions.py | 244 ++++++---------------------- 1 file changed, 47 insertions(+), 197 deletions(-) diff --git a/gis/gccview/gcc_puller_functions.py b/gis/gccview/gcc_puller_functions.py index 3a2142364..e5dd365c9 100644 --- a/gis/gccview/gcc_puller_functions.py +++ b/gis/gccview/gcc_puller_functions.py @@ -141,8 +141,8 @@ def create_audited_table(output_table, return_json, schema_name, primary_key, co LOGGER.info(create_sql.as_string(con)) cur.execute(create_sql) - owner_sql = sql.SQL("ALTER TABLE IF EXISTS {schema_table} OWNER to gis_admins").format(schema_table = sql.Identifier(schema_name, temp_table_name)) - cur.execute(owner_sql) + # owner_sql = sql.SQL("ALTER TABLE IF EXISTS {schema_table} OWNER to gis_admins").format(schema_table = sql.Identifier(schema_name, temp_table_name)) + # cur.execute(owner_sql) # Add a pk with con: @@ -390,88 +390,13 @@ def find_limit(return_json): keep_adding = False return keep_adding -def insert_audited_data(output_table, insert_column, return_json, schema_name, con): - """ - Function to insert data to our postgresql database, the data is inserted into a temp table (for audited tables) - - Parameters - ---------- - output_table : string - Table name for postgresql, returned from function get_tablename - - insert_column : SQL composed - Composed object of column name and types use for creating a new postgresql table - - return_json : json - Resulted json response from calling the api, returned from function get_data - - schema_name : string - The schema in which the table will be inserted into - - con: Airflow Connection - Could be the connection to bigdata or to on-prem server - """ +def insert_data(output_table, insert_column, return_json, schema_name, con, is_audited, is_partitioned): rows = [] features = return_json['features'] fields = return_json['fields'] trials = [[field['name'], field['type']] for field in fields] - - for feature in features: - geom = feature['geometry'] - geometry_type = return_json['geometryType'] - geometry = get_geometry(geometry_type, geom) - - row = [] - for trial in trials: - if trial[1] == 'esriFieldTypeDate' and feature['attributes'][trial[0]] != None: - row.append(to_time(feature['attributes'][trial[0]])) - else: - row.append(feature['attributes'][trial[0]]) - - row.append(geometry) - - rows.append(row) - - # Since this is a temporary table, name it '_table' as opposed to 'table' for now (for audited tables) - temp_table_name = '_' + output_table - - insert=sql.SQL("INSERT INTO {schema_table} ({columns}) VALUES %s").format( - schema_table = sql.Identifier(schema_name, temp_table_name), - columns = insert_column - ) - with con: - with con.cursor() as cur: - execute_values(cur, insert, rows) - LOGGER.info('Successfully inserted %d records into %s', len(rows), output_table) - -def insert_partitioned_data(schema_parent_table_insert, insert_column, return_json, schema_name, con): - """ - Function to insert data to our postgresql database (for partitioned tables) - - Parameters - ---------- - schema_parent_table_insert : string - Table name for postgresql, returned from function create_partitioned_table - - insert_column : SQL composed - Composed object of column name and types use for creating a new postgresql table - - return_json : json - Resulted json response from calling the api, returned from function get_data - - schema_name : string - The schema in which the table will be inserted into - - con: Airflow Connection - Could be the connection to bigdata or to on-prem server - """ - today_string = datetime.date.today().strftime('%Y-%m-%d') - rows = [] - features = return_json['features'] - fields = return_json['fields'] - trials = [[field['name'], field['type']] for field in fields] for feature in features: geom = feature['geometry'] geometry_type = return_json['geometryType'] @@ -483,69 +408,21 @@ def insert_partitioned_data(schema_parent_table_insert, insert_column, return_js row.append(to_time(feature['attributes'][trial[0]])) else: row.append(feature['attributes'][trial[0]]) - - row.insert(0, today_string) + + if is_partitioned: + row.insert(0, today_string) row.append(geometry) rows.append(row) - - insert=sql.SQL("INSERT INTO {schema_table} ({columns}) VALUES %s").format( - schema_table = sql.Identifier(schema_name, schema_parent_table_insert), - columns = insert_column - ) - with con: - with con.cursor() as cur: - execute_values(cur, insert, rows) - LOGGER.info('Successfully inserted %d records into %s', len(rows), schema_parent_table_insert) - -def insert_data(output_table, insert_column, return_json, schema_name, con): - """ - Function to insert data to our postgresql database, the data is inserted into a temp table (for audited tables) - - Parameters - ---------- - output_table : string - Table name for postgresql, returned from function get_tablename + if is_audited: + output_table = '_' + output_table - insert_column : SQL composed - Composed object of column name and types use for creating a new postgresql table - - return_json : json - Resulted json response from calling the api, returned from function get_data - - schema_name : string - The schema in which the table will be inserted into - - con: Airflow Connection - Could be the connection to bigdata or to on-prem server - """ - rows = [] - features = return_json['features'] - fields = return_json['fields'] - trials = [[field['name'], field['type']] for field in fields] - - for feature in features: - geom = feature['geometry'] - geometry_type = return_json['geometryType'] - geometry = get_geometry(geometry_type, geom) - - row = [] - for trial in trials: - if trial[1] == 'esriFieldTypeDate' and feature['attributes'][trial[0]] != None: - row.append(to_time(feature['attributes'][trial[0]])) - else: - row.append(feature['attributes'][trial[0]]) - - row.append(geometry) - - rows.append(row) - - insert=sql.SQL("INSERT INTO {schema_table} ({columns}) VALUES %s").format( schema_table = sql.Identifier(schema_name, output_table), columns = insert_column ) + with con: with con.cursor() as cur: execute_values(cur, insert, rows) @@ -649,7 +526,7 @@ def update_table(output_table, insert_column, excluded_column, primary_key, sche return successful_execution #------------------------------------------------------------------------------------------------------- # base main function, also compatible with Airflow -def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None, primary_key = None, is_manual = False): +def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None, primary_key = None, is_partitioned = False): """ This function calls to the GCCview rest API and inserts the outputs to the output table in the postgres database. @@ -694,71 +571,42 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = keep_adding = True counter = 0 #-------------------------------- - if is_manual: - while keep_adding == True: - - if counter == 0: - return_json = get_data(mapserver, layer_id) - insert_column = create_table(output_table, return_json, schema_name, con) - - features = return_json['features'] - record_max=len(features) - max_number = record_max - else: - return_json = get_data(mapserver, layer_id, max_number = max_number, record_max = record_max) - - # Insert data into the table - insert_data(output_table, insert_column, return_json, schema_name, con) - - # Update loop variables - counter += 1 - keep_adding = find_limit(return_json) - - if keep_adding: - max_number += record_max - else: - LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) - else: - if is_audited and primary_key is None: + if is_audited and primary_key is None: LOGGER.error("Audited tables should have a primary key.") - if not(is_audited) and primary_key is not None: - LOGGER.error("Non-audited tables do not use the primary key.") - #-------------------------------- - while keep_adding == True: - if counter == 0: - # Without specifying the max_number and record_max, the number of data we get from this query - # should be the max record count of the layer if the layer is huge - return_json = get_data(mapserver, layer_id) - if is_audited: - (insert_column, excluded_column) = create_audited_table(output_table, return_json, schema_name, primary_key, con) - else: - (insert_column, schema_parent_table_insert) = create_partitioned_table(output_table, return_json, schema_name, con) - - features = return_json['features'] - record_max=(len(features)) - max_number = record_max - else: - return_json = get_data(mapserver, layer_id, max_number = max_number, record_max = record_max) - - # Insert data into the table + if not(is_audited) and primary_key is not None: + LOGGER.error("Non-audited tables do not use the primary key.") + #-------------------------------- + while keep_adding == True: + if counter == 0: + return_json = get_data(mapserver, layer_id) if is_audited: - insert_audited_data(output_table, insert_column, return_json, schema_name, con) + (insert_column, excluded_column) = create_audited_table(output_table, return_json, schema_name, primary_key, con) + elif is_partitioned: + (insert_column, output_table) = create_partitioned_table(output_table, return_json, schema_name, con) else: - insert_partitioned_data(schema_parent_table_insert, insert_column, return_json, schema_name, con) - # Update loop variables - counter += 1 - keep_adding = find_limit(return_json) + insert_column = create_table(output_table, return_json, schema_name, con) + features = return_json['features'] + record_max=(len(features)) + max_number = record_max + else: + return_json = get_data(mapserver, layer_id, max_number = max_number, record_max = record_max) - if keep_adding: - max_number += record_max - else: - LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) + insert_data(output_table, insert_column, return_json, schema_name, con, is_audited, is_partitioned) + + counter += 1 + keep_adding = find_limit(return_json) + + if keep_adding: + max_number += record_max + else: + LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) + + if is_audited: + try: + update_table(output_table, insert_column, excluded_column, primary_key, schema_name, con) + except Exception as err: + LOGGER.exception("Unable to update table %s", err) - if is_audited: - try: - update_table(output_table, insert_column, excluded_column, primary_key, schema_name, con) - except Exception as err: - LOGGER.exception("Unable to update table %s", err) @click.command() @click.option('--mapserver', '-ms', type = int, required = True, @@ -768,12 +616,14 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = @click.option('--schema-name', '-s', type = str, required = True , help = 'Name of destination schema') @click.option('--is-audited', '-a', is_flag=True, show_default=True, default=False, - help = 'Whether the table is supposed to be audited (T) or partitioned (F)') + help = 'Whether the table is supposed to be audited (T) or non-audited(F)') @click.option('--primary-key', '-pk', type = str, default=None, required = False, help = 'Primary key. Only include if table is audited.') @click.option('--con', '-c', type = str, required = True, help = 'The path to the credential config file') -def manual_get_layer(mapserver, layer_id, schema_name, is_audited, primary_key, con): +@click.option('--is-partitioned', '-p', is_flag=True, show_default=True, default=False, + help = 'Whether the table is supposed to be partitioned (T) or not partitioned (F)') +def manual_get_layer(mapserver, layer_id, schema_name, is_audited, primary_key, con, is_partitioned=True): """ This script pulls a GIS layer from GCC servers into the databases of the Data and Analytics Unit. @@ -794,8 +644,8 @@ def manual_get_layer(mapserver, layer_id, schema_name, is_audited, primary_key, is_audited = is_audited, primary_key = primary_key, con=connection_obj, - is_manual = True + is_partitioned = is_partitioned ) if __name__ == '__main__': - manual_get_layer() + manual_get_layer() \ No newline at end of file From 3c3470904830b65830cd1a8d862b0e1de956db1a Mon Sep 17 00:00:00 2001 From: Leo Xu Date: Thu, 3 Oct 2024 20:31:51 +0000 Subject: [PATCH 12/14] #1028 Update comments for gcc puller functions and updae README.md --- gis/gccview/README.md | 37 ++++++++++++++---- gis/gccview/gcc_puller_functions.py | 60 +++++++++++++++++++---------- 2 files changed, 70 insertions(+), 27 deletions(-) diff --git a/gis/gccview/README.md b/gis/gccview/README.md index 6bc861e71..7fab04f06 100644 --- a/gis/gccview/README.md +++ b/gis/gccview/README.md @@ -80,15 +80,18 @@ The GCC pipeline will be pulling multiple layers into the `gis_core` and `gis` s ## Data Pipeline -The pipeline consists of two files, `gcc_puller_functions.py` for the functions and `/dags/gcc_layers_pull.py` for the Airflow DAG. The main function that fetches the layers is called `get_layer` and it takes in five parameters. Here is a list that describes what each parameter means: +The pipeline consists of two files, `gcc_puller_functions.py` for the functions and `/dags/gcc_layers_pull.py` for the Airflow DAG. The main function that fetches the layers is called `get_layer` and it takes in eight parameters. Here is a list that describes what each parameter means: - mapserver_n (int): ID of the mapserver that host the desired layer - layer_id (int): ID of the layer within the particular mapserver - schema_name (string): name of destination schema -- is_audited (Boolean): True if the layer will be in a table that is audited, False if the layer will be inserted as a child table part of a parent table +- is_audited (Boolean): True if the layer will be in a table that is audited, False if the layer will be non-audited - cred (Airflow PostgresHook): the Airflow PostgresHook that directs to credentials to enable a connection to a particular database +- con (used when manually pull): the path to the credential config file. Default is ~/db.cfg +- primary_key (used when pulling an audited table): primary key for this layer, returned from dictionary pk_dict when pulling for the Airflow DAG, set it manually when pulling a layer yourself. +- is_partitioned (Boolean): True if the layer will be inserted as a child table part of a parent table, False if the layer will be neither audited nor partitioned. -In the DAG file, the arguments for each layer are stored in dictionaries called "bigdata_layers" and "ptc_layers", in the order above. The DAG will be executed once every 3 months, particularly on the 15th of every March, June, September, and December every year. +In the DAG file, the arguments for each layer are stored in dictionaries called "bigdata_layers" and "ptc_layers", in the order above. The DAG will be executed once every 3 months, particularly on the 15th of every March, June, September, and December every year. The DAG will pull either audited table or partitioned table since the "is_partitioned" argument is not stored in dictionaries and are set to default value True. ## Adding new layers to GCC Puller DAG 1. Identify the mapserver_n and layer_id for the layer you wish to add. You can find COT transportation layers here: https://insideto-gis.toronto.ca/arcgis/rest/services/cot_geospatial2/FeatureServer, where mapserver_n is 2 and the layer_id is in brackets after the layer name. @@ -97,7 +100,7 @@ In the DAG file, the arguments for each layer are stored in dictionaries called ## Manually fetch layers -If you need to pull a layer as a one-off task, this script allows you to pull any layer from the GCC Rest API. Please note that the script must be run locally or on a on-prem server as it needs connection to insideto. +If you need to pull a layer as a one-off task, `gcc_puller_functions.py` allows you to pull any layer from the GCC Rest API. Please note that the script must be run locally or on a on-prem server as it needs connection to insideto. Before running the script, ensure that you have set up the appropriate environment with all necessary packages installed. You might have to set the `https_proxy` in your environment with your novell username and password in order to clone this repo or install packages. If you run into any issues, don't hestitate to ask a sysadmin. You can then install all packages in the `requirement.txt`, either with: 1) Activate your virtual environment, it should automatically install them for you @@ -120,7 +123,7 @@ Before running the script, ensure that you have set up the appropriate environme Now you are set to run the script! -There are 4 inputs that need to be entered. +There are 7 inputs that can be entered. `--mapserver`: Mapserver number, e.g. cotgeospatial_2 will be 2 @@ -130,9 +133,29 @@ There are 4 inputs that need to be entered. `--con`(optional): The path to the credential config file. Default is ~/db.cfg -Example of pulling the library layer to the gis schema. +`--is_audited`: Whether table will be audited or not, specify the option on the command line will set this option to True; while not specifying will give the default False. + +`primary_key`(required when pulling an audited table): Primary key for the layer + +`is_partitioned`: Whether table will be a child table of a parent table or with no feature, specify the option on the command line will set this option to True; while not specifying will give the default False. + +Example of pulling the library layer (table with no feature) to the gis schema. + + +```python +python3 bdit_data-sources/gis/gccview/gcc_puller_functions.py --mapserver 28 --layer-id 28 --schema-name gis --con db.cfg +``` + +Example of pulling the intersection layer (partitioned) to the gis_core schema. ```python -python gcc_layer_puller.py --mapserver 28 --layer-id 28 --schema-name gis --con db.cfg +python3 bdit_data-sources/gis/gccview/gcc_puller_functions.py --mapserver 12 --layer-id 42 --schema-name gis_core --con db.cfg --is-partitioned ``` + +Example of pulling the city_ward layer (partitioned) to the gis_core schema. + + +```python +python3 bdit_data-sources/gis/gccview/gcc_puller_functions.py --mapserver 0 --layer-id 0 --schema-name gis_core --con db.cfg --is-audited --primary-key area_id +``` \ No newline at end of file diff --git a/gis/gccview/gcc_puller_functions.py b/gis/gccview/gcc_puller_functions.py index e5dd365c9..8a4fa32c6 100644 --- a/gis/gccview/gcc_puller_functions.py +++ b/gis/gccview/gcc_puller_functions.py @@ -42,10 +42,10 @@ def get_tablename(mapserver, layer_id): Parameters ----------- - mapserver: string + mapserver : string The name of the mapserver we are accessing, returned from function mapserver_name - layer_id: integer + layer_id : integer Unique layer id that represent a single layer in the mapserver Returns @@ -103,7 +103,7 @@ def create_audited_table(output_table, return_json, schema_name, primary_key, co primary_key : string Primary key for this layer, returned from dictionary pk_dict - con: Airflow Connection + con : Airflow Connection Could be the connection to bigdata or to on-prem server Returns @@ -141,8 +141,8 @@ def create_audited_table(output_table, return_json, schema_name, primary_key, co LOGGER.info(create_sql.as_string(con)) cur.execute(create_sql) - # owner_sql = sql.SQL("ALTER TABLE IF EXISTS {schema_table} OWNER to gis_admins").format(schema_table = sql.Identifier(schema_name, temp_table_name)) - # cur.execute(owner_sql) + owner_sql = sql.SQL("ALTER TABLE IF EXISTS {schema_table} OWNER to gis_admins").format(schema_table = sql.Identifier(schema_name, temp_table_name)) + cur.execute(owner_sql) # Add a pk with con: @@ -166,7 +166,7 @@ def create_partitioned_table(output_table, return_json, schema_name, con): schema_name : string The schema in which the table will be inserted into - con: Airflow Connection + con : Airflow Connection Could be the connection to bigdata or to on-prem server Returns @@ -206,7 +206,7 @@ def create_partitioned_table(output_table, return_json, schema_name, con): def create_table(output_table, return_json, schema_name, con): """ - Function to create a new table in postgresql for the layer (for audited tables only) + Function to create a new table in postgresql for the layer (for regular table) Parameter --------- @@ -219,19 +219,13 @@ def create_table(output_table, return_json, schema_name, con): schema_name : string The schema in which the table will be inserted into - primary_key : string - Primary key for this layer, returned from dictionary pk_dict - - con: Connection + con : Connection Could be the connection to bigdata or to on-prem server Returns -------- insert_columm : SQL composed Composed object of column name and types use for creating a new postgresql table - - excluded_column : SQL composed - Composed object that is similar to insert_column, but has 'EXCLUDED.' attached before each column name, used for UPSERT query """ fields = return_json['fields'] @@ -391,6 +385,29 @@ def find_limit(return_json): return keep_adding def insert_data(output_table, insert_column, return_json, schema_name, con, is_audited, is_partitioned): + """ + Function to insert data to our postgresql database + Parameters + ---------- + output_table : string + Table name for postgresql, returned from function get_tablename + insert_column : SQL composed + Composed object of column name and types use for creating a new postgresql table + return_json : json + Resulted json response from calling the api, returned from function get_data + + schema_name : string + The schema in which the table will be inserted into + + con : Airflow Connection + Could be the connection to bigdata or to on-prem server + + is_audited : Boolean + Whether we want to have the table be audited (true) or be non-audited (false) + + is_partitioned : Boolean + Whether we want to have the table be partitioned (true) or neither audited nor partitioned(false) + """ rows = [] features = return_json['features'] fields = return_json['fields'] @@ -450,7 +467,7 @@ def update_table(output_table, insert_column, excluded_column, primary_key, sche schema_name : string The schema in which the table will be inserted into - con: Airflow Connection + con : Airflow Connection Could be the connection to bigdata or to on-prem server Returns @@ -526,7 +543,7 @@ def update_table(output_table, insert_column, excluded_column, primary_key, sche return successful_execution #------------------------------------------------------------------------------------------------------- # base main function, also compatible with Airflow -def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None, primary_key = None, is_partitioned = False): +def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None, primary_key = None, is_partitioned = True): """ This function calls to the GCCview rest API and inserts the outputs to the output table in the postgres database. @@ -541,16 +558,19 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = schema_name : string The schema in which the table will be inserted into - is_audited: Boolean - Whether we want to have the table be audited (true) or be partitioned (false) + is_audited : Boolean + Whether we want to have the table be audited (true) or be non-audited (false) - cred: Airflow PostgresHook + cred : Airflow PostgresHook Contains credentials to enable a connection to a database Expects a valid cred input when running Airflow DAG - con: connection to database + con : connection to database Connection object that can connect to a particular database Expects a valid con object if using command prompt + + is_partitioned : Boolean + Whether we want to have the table be partitioned (true) or neither audited nor partitioned(false) """ # For Airflow DAG From 8c4bf9603b115216de6794e14ed340a59e3c503e Mon Sep 17 00:00:00 2001 From: Leo Xu Date: Mon, 7 Oct 2024 15:15:29 +0000 Subject: [PATCH 13/14] #1028 Bug fixes for audited tables --- gis/gccview/gcc_puller_functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gis/gccview/gcc_puller_functions.py b/gis/gccview/gcc_puller_functions.py index 8a4fa32c6..100ba4224 100644 --- a/gis/gccview/gcc_puller_functions.py +++ b/gis/gccview/gcc_puller_functions.py @@ -426,7 +426,7 @@ def insert_data(output_table, insert_column, return_json, schema_name, con, is_a else: row.append(feature['attributes'][trial[0]]) - if is_partitioned: + if (not is_audited) and is_partitioned: row.insert(0, today_string) row.append(geometry) From 8fd1985f83bc9dbce259636d69e00e8b2ed51cda Mon Sep 17 00:00:00 2001 From: Leo Xu Date: Mon, 7 Oct 2024 15:57:36 +0000 Subject: [PATCH 14/14] #1028 Deleted outdated layer pulling ipynb file --- gis/gccview/gcc_puller.ipynb | 967 ----------------------------------- 1 file changed, 967 deletions(-) delete mode 100644 gis/gccview/gcc_puller.ipynb diff --git a/gis/gccview/gcc_puller.ipynb b/gis/gccview/gcc_puller.ipynb deleted file mode 100644 index fbeaf67f1..000000000 --- a/gis/gccview/gcc_puller.ipynb +++ /dev/null @@ -1,967 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "id": "4b81dce7-5261-414d-bab3-de188ac69eb5", - "metadata": {}, - "outputs": [], - "source": [ - "import configparser\n", - "import requests\n", - "import datetime\n", - "from psycopg2 import connect\n", - "from psycopg2 import sql\n", - "from psycopg2.extras import execute_values\n", - "import logging\n", - "from time import sleep\n", - "#from airflow.exceptions import AirflowFailException\n", - "import click\n", - "CONFIG = configparser.ConfigParser()" - ] - }, - { - "cell_type": "markdown", - "id": "c6c3c63b-17c8-4c35-b735-9957520f4771", - "metadata": {}, - "source": [ - "## Config [Enter things here]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "4f8e2297-98b2-4b0d-8c3f-74533aa7d68a", - "metadata": {}, - "outputs": [], - "source": [ - "# Enter your .cfg file's path\n", - "# e.g. CONFIG.read('/home/bqu/db_morbius.cfg')\n", - "CONFIG.read('/home/bqu/db_ec2.cfg')\n", - "dbset = CONFIG['DBSETTINGS']\n", - "connection_obj = connect(**dbset)" - ] - }, - { - "cell_type": "markdown", - "id": "b8df7af6-ebdb-4fbc-8de1-fa0806b3756a", - "metadata": { - "tags": [] - }, - "source": [ - "## Logger" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ae07788e-24fd-4aa4-9c36-c90c627c7046", - "metadata": {}, - "outputs": [], - "source": [ - "\"\"\"The following provides information about the code when it is running and prints out the log messages \n", - "if they are of logging level equal to or greater than INFO\"\"\"\n", - "LOGGER = logging.getLogger(__name__)\n", - "logging.basicConfig(level=logging.INFO)" - ] - }, - { - "cell_type": "markdown", - "id": "ede82259-0284-438f-9a0c-12b479d2198a", - "metadata": { - "tags": [] - }, - "source": [ - "## Get mapserver name and generate table name" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f4905567-4aac-4bd8-aa73-138a485bc2d9", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "def mapserver_name(mapserver_n):\n", - " \"\"\"\n", - " Function to return the mapserver name from integer\n", - " \n", - " Parameters\n", - " ------------\n", - " mapserver_n : numeric\n", - " The number of mapserver we will be accessing. 0 for 'cot_geospatial'\n", - " \n", - " Returns\n", - " --------\n", - " mapserver_name : string\n", - " The name of the mapserver\n", - " \"\"\"\n", - " \n", - " if mapserver_n == 0:\n", - " mapserver_name = 'cot_geospatial'\n", - " else:\n", - " mapserver_name = 'cot_geospatial' + str(mapserver_n)\n", - " \n", - " return(mapserver_name)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "c82fe4d8-1d42-48ff-9134-b20cf0d059f2", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "def get_tablename(mapserver, layer_id):\n", - " \"\"\"\n", - " Function to return the name of the layer\n", - "\n", - " Parameters\n", - " -----------\n", - " mapserver: string\n", - " The name of the mapserver we are accessing, returned from function mapserver_name\n", - " \n", - " layer_id: integer\n", - " Unique layer id that represent a single layer in the mapserver\n", - " \n", - " Returns\n", - " --------\n", - " output_name\n", - " The table name of the layer in database\n", - " \"\"\"\n", - " \n", - " url = 'https://insideto-gis.toronto.ca/arcgis/rest/services/'+mapserver+'/MapServer/layers?f=json'\n", - " try:\n", - " r = requests.get(url, verify = False, timeout = 20)\n", - " r.raise_for_status()\n", - " except requests.exceptions.HTTPError as err_h:\n", - " LOGGER.error(\"Invalid HTTP response: \", err_h)\n", - " except requests.exceptions.ConnectionError as err_c:\n", - " LOGGER.error(\"Network problem: \", err_c)\n", - " except requests.exceptions.Timeout as err_t:\n", - " LOGGER.error(\"Timeout: \", err_t)\n", - " except requests.exceptions.RequestException as err:\n", - " LOGGER.error(\"Error: \", err)\n", - " else:\n", - " ajson = r.json()\n", - " layers = ajson['layers']\n", - " for layer in layers:\n", - " if layer['id'] == layer_id:\n", - " output_name = (layer['name'].lower()).replace(' ', '_')\n", - " else:\n", - " continue\n", - "\n", - " return output_name" - ] - }, - { - "cell_type": "markdown", - "id": "621441e0-033c-4cde-8c69-94fc22b70c5e", - "metadata": { - "tags": [] - }, - "source": [ - "## Create table in DB" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "469f189b-0d9e-418d-9826-3ea795aa14cd", - "metadata": {}, - "outputs": [], - "source": [ - "def get_fieldtype(field):\n", - " if field == 'esriFieldTypeInteger' or field == 'esriFieldTypeSingle' or field == 'esriFieldTypeInteger' or field=='esriFieldTypeOID' or field == 'esriFieldTypeSmallInteger' or field =='esriFieldGlobalID':\n", - " fieldtype = 'integer'\n", - " elif field == 'esriFieldTypeString':\n", - " fieldtype = 'text'\n", - " elif field == 'esriFieldTypeDouble':\n", - " fieldtype = 'numeric'\n", - " elif field == 'esriFieldTypeDate':\n", - " fieldtype = 'timestamp without time zone'\n", - " return fieldtype" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "e30548ec-a710-4d85-9bf7-1a96f9c7b208", - "metadata": {}, - "outputs": [], - "source": [ - "def create_audited_table(output_table, return_json, schema_name, primary_key, con):\n", - " \"\"\"\n", - " Function to create a new table in postgresql for the layer (for audited tables only)\n", - "\n", - " Parameter\n", - " ---------\n", - " output_table : string\n", - " Table name for postgresql, returned from function get_tablename\n", - "\n", - " return_json : json\n", - " Resulted json response from calling the api, returned from function get_data\n", - " \n", - " schema_name : string\n", - " The schema in which the table will be inserted into\n", - " \n", - " primary_key : string\n", - " Primary key for this layer, returned from dictionary pk_dict\n", - " \n", - " con: Airflow Connection\n", - " Could be the connection to bigdata or to on-prem server\n", - "\n", - " Returns\n", - " --------\n", - " insert_columm : SQL composed\n", - " Composed object of column name and types use for creating a new postgresql table\n", - " \n", - " excluded_column : SQL composed\n", - " Composed object that is similar to insert_column, but has 'EXCLUDED.' attached before each column name, used for UPSERT query\n", - " \"\"\"\n", - " \n", - " fields = return_json['fields']\n", - " insert_column_list = [sql.Identifier((field['name'].lower()).replace('.', '_')) for field in fields]\n", - " insert_column_list.append(sql.Identifier('geom'))\n", - " insert_column = sql.SQL(',').join(insert_column_list)\n", - " \n", - " # For audited tables only\n", - " excluded_column_list = [sql.SQL('EXCLUDED.') + sql.Identifier((field['name'].lower()).replace('.', '_')) for field in fields]\n", - " excluded_column_list.append(sql.SQL('EXCLUDED.') + sql.Identifier('geom'))\n", - " excluded_column = sql.SQL(',').join(excluded_column_list)\n", - " \n", - " # Since this is a temporary table, name it '_table' as opposed to 'table' for now\n", - " temp_table_name = '_' + output_table\n", - " \n", - " with con:\n", - " with con.cursor() as cur:\n", - " \n", - " col_list = [sql.Identifier((field['name'].lower()).replace('.', '_')) + sql.SQL(' ') + sql.SQL(get_fieldtype(field[\"type\"])) for field in fields]\n", - " col_list.append(sql.Identifier('geom') + sql.SQL(' ') + sql.SQL('geometry'))\n", - " col_list_string = sql.SQL(',').join(col_list)\n", - " \n", - " LOGGER.info(col_list_string.as_string(con))\n", - " create_sql = sql.SQL(\"CREATE TABLE IF NOT EXISTS {schema_table} ({columns})\").format(schema_table = sql.Identifier(schema_name, temp_table_name),\n", - " columns = col_list_string)\n", - " LOGGER.info(create_sql.as_string(con))\n", - " cur.execute(create_sql)\n", - " \n", - " '''\n", - " owner_sql = sql.SQL(\"ALTER TABLE IF EXISTS {schema_table} OWNER to gis_admins\").format(schema_table = sql.Identifier(schema_name, temp_table_name))\n", - " cur.execute(owner_sql)\n", - " '''\n", - " \n", - " # Add a pk\n", - " with con:\n", - " with con.cursor() as cur:\n", - " cur.execute(sql.SQL(\"ALTER TABLE {schema_table} ADD PRIMARY KEY ({pk})\").format(schema_table = sql.Identifier(schema_name, temp_table_name),\n", - " pk = sql.Identifier(primary_key)))\n", - " return insert_column, excluded_column" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "c775a329-eeb4-4a3c-91a1-fce73d8835ce", - "metadata": {}, - "outputs": [], - "source": [ - "def create_partitioned_table(output_table, return_json, schema_name, con):\n", - " \"\"\"\n", - " Function to create a new table in postgresql for the layer (for partitioned tables only)\n", - "\n", - " Parameter\n", - " ---------\n", - " output_table : string\n", - " Table name for postgresql, returned from function get_tablename\n", - "\n", - " return_json : json\n", - " Resulted json response from calling the api, returned from function get_data\n", - " \n", - " schema_name : string\n", - " The schema in which the table will be inserted into\n", - " \n", - " con: Airflow Connection\n", - " Could be the connection to bigdata or to on-prem server\n", - "\n", - " Returns\n", - " --------\n", - " insert_columm : SQL composed\n", - " Composed object of column name and types use for creating a new postgresql table\n", - " \n", - " output_table_with_date : string\n", - " Table name with date attached at the end, for partitioned tables in postgresql \n", - " \"\"\"\n", - " \n", - " fields = return_json['fields']\n", - " insert_column_list = [sql.Identifier((field['name'].lower()).replace('.', '_')) for field in fields]\n", - " insert_column_list.insert(0, sql.Identifier('version_date'))\n", - " insert_column_list.append(sql.Identifier('geom'))\n", - " insert_column = sql.SQL(',').join(insert_column_list)\n", - " \n", - " # Date format YYYY-MM-DD, for the SQL query\n", - " today_string = datetime.date.today().strftime('%Y-%m-%d')\n", - " # Date format _YYYYMMDD, to be attached at the end of output_table name\n", - " date_attachment = datetime.date.today().strftime('_%Y%m%d')\n", - " output_table_with_date = output_table + date_attachment\n", - " index_name = output_table_with_date + '_idx'\n", - " \n", - " with con:\n", - " with con.cursor() as cur:\n", - " \n", - " create_sql = sql.SQL(\"CREATE TABLE IF NOT EXISTS {schema_child_table} PARTITION OF {schema_parent_table} FOR VALUES IN (%s)\").format(schema_child_table = sql.Identifier(schema_name, output_table_with_date),\n", - " schema_parent_table = sql.Identifier(schema_name, output_table))\n", - " cur.execute(create_sql, (today_string, ))\n", - "\n", - " index_sql = sql.SQL(\"CREATE INDEX {idx_name} ON {schema_child_table} USING gist (geom)\").format(idx_name=sql.Identifier(index_name),\n", - " schema_child_table=sql.Identifier(schema_name, output_table_with_date))\n", - " cur.execute(index_sql)\n", - " \n", - " return insert_column, output_table_with_date" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "eeaa7d40-92c0-4a6e-8528-36f37ac26d42", - "metadata": {}, - "outputs": [], - "source": [ - "# Geometry Switcher \n", - "def line(geom):\n", - " return 'SRID=4326;LineString('+','.join(' '.join(str(x) for x in tup) for tup in geom['paths'][0]) +')'\n", - "def polygon(geom):\n", - " return 'SRID=4326;MultiPolygon((('+','.join(' '.join(str(x) for x in tup) for tup in geom['rings'][0]) +')))'\n", - "def point(geom):\n", - " return 'SRID=4326;Point('+(str(geom['x']))+' '+ (str(geom['y']))+')' \n", - "def get_geometry(geometry_type, geom):\n", - " switcher = {\n", - " 'esriGeometryLine':line,\n", - " 'esriGeometryPolyline': line,\n", - " 'esriGeometryPoint': point,\n", - " 'esriGeometryMultiPolygon': polygon,\n", - " 'esriGeometryPolygon': polygon\n", - " }\n", - " func = switcher.get(geometry_type)\n", - " geometry = (func(geom)) \n", - " return geometry" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "81928a3a-5710-4e06-a995-32db05be8dbc", - "metadata": {}, - "outputs": [], - "source": [ - "def to_time(input):\n", - " \"\"\"\n", - " Convert epoch time to postgresql timestamp without time zone\n", - "\n", - " Parameters\n", - " -----------\n", - " input : string\n", - " Epoch time attribute in return_json\n", - "\n", - " Returns\n", - " --------\n", - " time : string\n", - " Time in the type of postgresql timestamp without time zone\n", - " \"\"\"\n", - " \n", - " time = datetime.datetime.fromtimestamp(abs(input)/1000).strftime('%Y-%m-%d %H:%M:%S')\n", - " return time" - ] - }, - { - "cell_type": "markdown", - "id": "131af05a-bace-4c10-81a7-bbf03d08bbf6", - "metadata": {}, - "source": [ - "## Insert data from ArcGIS to DB" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "89b12d17-fea1-422b-91e8-02f399cbdc90", - "metadata": {}, - "outputs": [], - "source": [ - "def get_data(mapserver, layer_id, max_number = None, record_max = None):\n", - " \"\"\"\n", - " Function to retreive layer data from GCCView rest api\n", - "\n", - " Parameters\n", - " -----------\n", - " mapserver : string\n", - " The name of the mapserver we are accessing, returned from function mapserver_name\n", - "\n", - " layer_id : integer\n", - " Unique layer id that represent a single layer in the mapserver\n", - "\n", - " max_number : integer\n", - " Number for parameter `resultOffset` in the query, indicating the number of rows this query is going to skip\n", - "\n", - " record_max : integer\n", - " Number for parameter `resultRecordCount` in the query, indicating the number of rows this query is going to fetch\n", - "\n", - " Returns\n", - " --------\n", - " return_json : json\n", - " Resulted json response from calling the GCCView rest api\n", - " \"\"\"\n", - " \n", - " base_url = \"https://insideto-gis.toronto.ca/arcgis/rest/services/{}/MapServer/{}/query\".format(mapserver, layer_id)\n", - " \n", - " # If the data we want to get is centreline\n", - " if mapserver == 'cot_geospatial' and layer_id == 2:\n", - " query = {\"where\": \"\\\"FEATURE_CODE_DESC\\\" IN ('Collector','Collector Ramp','Expressway','Expressway Ramp','Local','Major Arterial','Major Arterial Ramp','Minor Arterial','Minor Arterial Ramp','Pending')\",\n", - " \"outFields\": \"*\",\n", - " \"outSR\": '4326',\n", - " \"returnGeometry\": \"true\",\n", - " \"returnTrueCurves\": \"false\",\n", - " \"returnIdsOnly\": \"false\",\n", - " \"returnCountOnly\": \"false\",\n", - " \"returnZ\": \"false\",\n", - " \"returnM\": \"false\",\n", - " \"orderByFields\": \"OBJECTID\", \n", - " \"returnDistinctValues\": \"false\",\n", - " \"returnExtentsOnly\": \"false\",\n", - " \"resultOffset\": \"{}\".format(max_number),\n", - " \"resultRecordCount\": \"{}\".format(record_max),\n", - " \"f\":\"json\"}\n", - " else:\n", - " query = {\"where\":\"1=1\",\n", - " \"outFields\": \"*\",\n", - " \"outSR\": '4326',\n", - " \"returnGeometry\": \"true\",\n", - " \"returnTrueCurves\": \"false\",\n", - " \"returnIdsOnly\": \"false\",\n", - " \"returnCountOnly\": \"false\",\n", - " \"returnZ\": \"false\",\n", - " \"returnM\": \"false\",\n", - " \"orderByFields\": \"OBJECTID\", \n", - " \"returnDistinctValues\": \"false\",\n", - " \"returnExtentsOnly\": \"false\",\n", - " \"resultOffset\": \"{}\".format(max_number),\n", - " \"resultRecordCount\": \"{}\".format(record_max),\n", - " \"f\":\"json\"}\n", - " \n", - " while True:\n", - " try:\n", - " r = requests.get(base_url, params = query, verify = False, timeout = 300)\n", - " r.raise_for_status()\n", - " except requests.exceptions.HTTPError as err_h:\n", - " LOGGER.error(\"Invalid HTTP response: \", err_h)\n", - " except requests.exceptions.ConnectionError as err_c:\n", - " LOGGER.error(\"Network problem: \", err_c)\n", - " sleep(10)\n", - " continue\n", - " except requests.exceptions.Timeout as err_t:\n", - " LOGGER.error(\"Timeout: \", err_t)\n", - " except requests.exceptions.RequestException as err:\n", - " LOGGER.error(\"Error: \", err)\n", - " else:\n", - " return_json = r.json()\n", - " break\n", - " \n", - " return return_json" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "c7e39cac-10a9-4d42-bbcd-53dda7643374", - "metadata": {}, - "outputs": [], - "source": [ - "def find_limit(return_json):\n", - " \"\"\"\n", - " Function to check if last query return all rows\n", - "\n", - " Parameters\n", - " -----------\n", - " return_json : json\n", - " Resulted json response from calling the api, returned from function get_data\n", - "\n", - " Returns\n", - " --------\n", - " keep_adding : Boolean\n", - " boolean 'keep_adding' indicating if last query returned all rows in the layer\n", - " \"\"\"\n", - " \n", - " if return_json.get('exceededTransferLimit', False) == True:\n", - " keep_adding = True\n", - " else:\n", - " keep_adding = False\n", - " return keep_adding" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ae775756-c82e-4320-b822-13001c4f5fbb", - "metadata": {}, - "outputs": [], - "source": [ - "def insert_audited_data(output_table, insert_column, return_json, schema_name, con):\n", - " \"\"\"\n", - " Function to insert data to our postgresql database, the data is inserted into a temp table (for audited tables)\n", - "\n", - " Parameters\n", - " ----------\n", - " output_table : string\n", - " Table name for postgresql, returned from function get_tablename\n", - "\n", - " insert_column : SQL composed\n", - " Composed object of column name and types use for creating a new postgresql table\n", - "\n", - " return_json : json\n", - " Resulted json response from calling the api, returned from function get_data\n", - " \n", - " schema_name : string\n", - " The schema in which the table will be inserted into\n", - " \n", - " con: Airflow Connection\n", - " Could be the connection to bigdata or to on-prem server\n", - " \"\"\"\n", - " rows = []\n", - " features = return_json['features']\n", - " fields = return_json['fields']\n", - " trials = [[field['name'], field['type']] for field in fields]\n", - "\n", - " for feature in features:\n", - " geom = feature['geometry']\n", - " geometry_type = return_json['geometryType']\n", - " geometry = get_geometry(geometry_type, geom)\n", - " \n", - " row = []\n", - " for trial in trials:\n", - " if trial[1] == 'esriFieldTypeDate' and feature['attributes'][trial[0]] != None:\n", - " row.append(to_time(feature['attributes'][trial[0]]))\n", - " else:\n", - " row.append(feature['attributes'][trial[0]])\n", - "\n", - " row.append(geometry)\n", - " \n", - " rows.append(row)\n", - " \n", - " # Since this is a temporary table, name it '_table' as opposed to 'table' for now (for audited tables)\n", - " temp_table_name = '_' + output_table\n", - " \n", - " insert=sql.SQL(\"INSERT INTO {schema_table} ({columns}) VALUES %s\").format(\n", - " schema_table = sql.Identifier(schema_name, temp_table_name), \n", - " columns = insert_column\n", - " )\n", - " with con:\n", - " with con.cursor() as cur:\n", - " execute_values(cur, insert, rows)\n", - " LOGGER.info('Successfully inserted %d records into %s', len(rows), output_table)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "cd05ccf3-892f-4e0b-9f5b-1d9f81c5177b", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "def insert_partitioned_data(output_table_with_date, insert_column, return_json, schema_name, con):\n", - " \"\"\"\n", - " Function to insert data to our postgresql database (for partitioned tables)\n", - "\n", - " Parameters\n", - " ----------\n", - " output_table_with_date : string\n", - " Table name for postgresql, returned from function create_partitioned_table\n", - "\n", - " insert_column : SQL composed\n", - " Composed object of column name and types use for creating a new postgresql table\n", - "\n", - " return_json : json\n", - " Resulted json response from calling the api, returned from function get_data\n", - " \n", - " schema_name : string\n", - " The schema in which the table will be inserted into\n", - " \n", - " con: Airflow Connection\n", - " Could be the connection to bigdata or to on-prem server\n", - " \"\"\" \n", - " \n", - " today_string = datetime.date.today().strftime('%Y-%m-%d')\n", - " \n", - " rows = []\n", - " features = return_json['features']\n", - " fields = return_json['fields']\n", - " trials = [[field['name'], field['type']] for field in fields]\n", - " for feature in features:\n", - " geom = feature['geometry']\n", - " geometry_type = return_json['geometryType']\n", - " geometry = get_geometry(geometry_type, geom)\n", - " \n", - " row = []\n", - " for trial in trials:\n", - " if trial[1] == 'esriFieldTypeDate' and feature['attributes'][trial[0]] != None:\n", - " row.append(to_time(feature['attributes'][trial[0]]))\n", - " else:\n", - " row.append(feature['attributes'][trial[0]])\n", - "\n", - " row.insert(0, today_string)\n", - " row.append(geometry)\n", - " \n", - " rows.append(row)\n", - "\n", - " \n", - " insert=sql.SQL(\"INSERT INTO {schema_table} ({columns}) VALUES %s\").format(\n", - " schema_table = sql.Identifier(schema_name, output_table_with_date), \n", - " columns = insert_column\n", - " )\n", - " with con:\n", - " with con.cursor() as cur:\n", - " execute_values(cur, insert, rows)\n", - " LOGGER.info('Successfully inserted %d records into %s', len(rows), output_table_with_date)" - ] - }, - { - "cell_type": "markdown", - "id": "4241c229-ee15-4a89-840c-5f346e1a0dde", - "metadata": { - "tags": [] - }, - "source": [ - "## Given mapserver_id and layer_id, get their PK" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "39e23a28-5854-4070-8191-e403b40cfbf9", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "pk_dict = {\n", - "\t\"city_ward\": \"area_id\",\n", - " \"census_tract\": \"area_id\",\n", - " \"neighbourhood_improvement_area\": \"area_id\",\n", - " \"priority_neighbourhood_for_investment\": \"area_id\",\n", - " \"ibms_district\": \"area_id\",\n", - " \"ibms_grid\": \"area_id\",\n", - " \"bikeway\": \"centreline_id\",\n", - " \"traffic_camera\": \"rec_id\",\n", - " \"permit_parking_area\": \"area_long_code\",\n", - " \"prai_transit_shelter\": \"id\",\n", - " \"traffic_bylaw_point\": \"objectid\",\n", - " \"traffic_bylaw_line\": \"objectid\",\n", - " \"loop_detector\": \"id\",\n", - " \"electrical_vehicle_charging_station\": \"id\",\n", - " \"day_care_centre\": \"loc_id\",\n", - " \"middle_childcare_centre\": \"id\",\n", - " \"business_improvement_area\": \"area_id\",\n", - " \"proposed_business_improvement_area\": \"objectid\",\n", - " \"film_permit_all\": \"objectid\",\n", - " \"film_permit_parking_all\": \"objectid\",\n", - " \"hotel\": \"id\",\n", - " \"convenience_store\": \"objectid\",\n", - " \"supermarket\": \"objectid\",\n", - " \"place_of_worship\": \"objectid\",\n", - " \"ymca\": \"objectid\",\n", - " \"aboriginal_organization\": \"id\",\n", - " \"attraction\": \"objectid\",\n", - " \"dropin\": \"objectid\",\n", - " \"early_years_centre\": \"id\",\n", - " \"family_resource_centre\": \"objectid\",\n", - " \"food_bank\": \"objectid\",\n", - " \"longterm_care\": \"id\",\n", - " \"parenting_family_literacy\": \"id\",\n", - " \"retirement_home\": \"id\",\n", - " \"senior_housing\": \"objectid\",\n", - " \"shelter\": \"objectid\",\n", - " \"social_housing\": \"objectid\",\n", - " \"private_road\": \"objectid\",\n", - " \"school\": \"objectid\",\n", - " \"library\": \"id\",\n", - "\t}" - ] - }, - { - "cell_type": "markdown", - "id": "9be34a19-228b-4272-86cf-43247a6580b3", - "metadata": {}, - "source": [ - "## Update audited table (UPSERT)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "4d8e1f5e-47ad-4e69-8945-6f70833c8a11", - "metadata": {}, - "outputs": [], - "source": [ - "def update_table(output_table, insert_column, excluded_column, primary_key, schema_name, con):\n", - " \"\"\"\n", - " Function to find differences between existing table and the newly created temp table, then UPSERT,\n", - " the temp table will be dropped in the end (for audited tables only)\n", - "\n", - " Parameters\n", - " ----------\n", - " output_table : string\n", - " Table name for postgresql, returned from function get_tablename\n", - "\n", - " insert_column : SQL composed\n", - " Composed object of column name and types use for creating a new postgresql table\n", - " \n", - " excluded_column : SQL composed\n", - " Composed object that is similar to insert_column, but has 'EXCLUDED.' attached before each column name, used for UPSERT query\n", - " \n", - " primary_key : string\n", - " primary key for this layer, returned from dictionary pk_dict\n", - " \n", - " schema_name : string\n", - " The schema in which the table will be inserted into\n", - " \n", - " con: Airflow Connection\n", - " Could be the connection to bigdata or to on-prem server\n", - " \n", - " Returns\n", - " --------\n", - " successful_execution : Boolean\n", - " whether any error had occured during UPSERT process\n", - " \"\"\"\n", - "\n", - " # Boolean to return, whether any error had occured during UPSERT process\n", - " successful_execution = True\n", - "\n", - " # Name the temporary table '_table' as opposed to 'table' for now\n", - " temp_table_name = '_' + output_table\n", - " \n", - " now = datetime.datetime.now()\n", - " date = (str(now.year)+str(now.month)+str(now.day))\n", - " \n", - " # Find if old table exists\n", - " with con:\n", - " with con.cursor() as cur:\n", - " \n", - " cur.execute(sql.SQL(\"SELECT COUNT(1) FROM information_schema.tables WHERE table_schema = %s AND table_name = %s\"), (schema_name, output_table))\n", - " result = cur.fetchone()\n", - " # If table exists\n", - " if result[0] == 1:\n", - " \n", - " try:\n", - " # Delete rows that no longer exist in the new table\n", - " cur.execute(sql.SQL(\"DELETE FROM {schema_tablename} WHERE {pk} IN (SELECT {pk} FROM {schema_tablename} EXCEPT SELECT {pk} FROM {schema_temp_table})\").format(\n", - " schema_tablename = sql.Identifier(schema_name, output_table), \n", - " pk = sql.Identifier(primary_key), \n", - " schema_temp_table = sql.Identifier(schema_name, temp_table_name)))\n", - "\n", - " # And then upsert stuff\n", - " upsert_string = \"INSERT INTO {schema_tablename} ({cols}) SELECT {cols} FROM {schema_temp_table} ON CONFLICT ({pk}) DO UPDATE SET ({cols}) = ({excl_cols}); COMMENT ON TABLE {schema_tablename} IS 'last updated: {date}'\"\n", - " cur.execute(sql.SQL(upsert_string).format(schema_tablename = sql.Identifier(schema_name, output_table),\n", - " schema_temp_table = sql.Identifier(schema_name, temp_table_name),\n", - " pk = sql.Identifier(primary_key),\n", - " cols = insert_column,\n", - " excl_cols = excluded_column,\n", - " date = sql.Identifier(date)))\n", - " LOGGER.info('Updated table %s', output_table)\n", - " except Exception:\n", - " # pass exception to function\n", - " LOGGER.exception(\"Failed to UPSERT\")\n", - " # rollback the previous transaction before starting another\n", - " con.rollback()\n", - " successful_execution = False\n", - " \n", - " # if table does not exist -> create a new one and add to audit list\n", - " else:\n", - " try:\n", - " cur.execute(sql.SQL(\"ALTER TABLE {schema_temp_table} RENAME TO {tablename}; COMMENT ON TABLE {schema_tablename} IS 'last updated: {date}'\").format(\n", - " schema_temp_table = sql.Identifier(schema_name, temp_table_name), \n", - " tablename = sql.Identifier(output_table),\n", - " schema_tablename = sql.Identifier(schema_name, output_table), \n", - " date = sql.Identifier(date)))\n", - "\n", - " \n", - " # Make schema_name and output_table into a single string\n", - " target_audit_table = sql.Literal(schema_name + '.' + output_table)\n", - " cur.execute(sql.SQL(\"SELECT {schema}.audit_table({schematable})\").format(schema = sql.Identifier(schema_name), \n", - " schematable = target_audit_table))\n", - " LOGGER.info('New table %s created and added to audit table list', output_table)\n", - " except Exception:\n", - " # pass exception to function\n", - " LOGGER.exception(\"Failed to create new table\")\n", - " # rollback the previous transaction before starting another\n", - " con.rollback()\n", - " successful_execution = False\n", - " \n", - " # And then drop the temp table (if exists)\n", - " cur.execute(sql.SQL(\"DROP TABLE IF EXISTS {schema_temp_table}\").format(schema_temp_table = sql.Identifier(schema_name, temp_table_name)))\n", - " return successful_execution" - ] - }, - { - "cell_type": "markdown", - "id": "5334fbd3-241c-4630-a707-6c0314a99f2f", - "metadata": { - "tags": [] - }, - "source": [ - "## Main function that the Task calls" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "5146472b-1a30-4599-b510-5900ba22fea9", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None):\n", - " \"\"\"\n", - " This function calls to the GCCview rest API and inserts the outputs to the output table in the postgres database.\n", - "\n", - " Parameters\n", - " ----------\n", - " mapserver : int\n", - " The name of the mapserver that host the desired layer\n", - "\n", - " layer_id : int\n", - " The id of desired layer\n", - " \n", - " schema_name : string\n", - " The schema in which the table will be inserted into\n", - " \n", - " is_audited: Boolean\n", - " Whether we want to have the table be audited (true) or be partitioned (false)\n", - " \n", - " cred: Airflow PostgresHook\n", - " Contains credentials to enable a connection to a database\n", - " Expects a valid cred input when running Airflow DAG\n", - " \n", - " con: connection to database\n", - " Connection object that can connect to a particular database\n", - " Expects a valid con object if using command prompt\n", - " \"\"\"\n", - " successful_task_run = True\n", - "\n", - " # For Airflow DAG\n", - " if cred is not None:\n", - " con = cred.get_conn()\n", - " \n", - " # At this point, there should must be a con now\n", - " if con is None:\n", - " LOGGER.error(\"Unable to establish connection to the database, please pass in a valid con\")\n", - " return\n", - " \n", - " mapserver = mapserver_name(mapserver_n)\n", - " output_table = get_tablename(mapserver, layer_id)\n", - " #--------------------------------\n", - " if is_audited:\n", - " primary_key = pk_dict.get(output_table)\n", - " #--------------------------------\n", - " keep_adding = True\n", - " counter = 0\n", - " \n", - " while keep_adding == True:\n", - " \n", - " if counter == 0:\n", - " return_json = get_data(mapserver, layer_id)\n", - " if is_audited:\n", - " (insert_column, excluded_column) = create_audited_table(output_table, return_json, schema_name, primary_key, con)\n", - " else:\n", - " (insert_column, output_table_with_date) = create_partitioned_table(output_table, return_json, schema_name, con)\n", - " \n", - " features = return_json['features']\n", - " record_max=(len(features))\n", - " max_number = record_max\n", - " \n", - " if is_audited:\n", - " insert_audited_data(output_table, insert_column, return_json, schema_name, con)\n", - " else:\n", - " insert_partitioned_data(output_table_with_date, insert_column, return_json, schema_name, con)\n", - " \n", - " counter += 1\n", - " keep_adding = find_limit(return_json)\n", - " if keep_adding == False:\n", - " LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table)\n", - " else:\n", - " return_json = get_data(mapserver, layer_id, max_number = max_number, record_max = record_max)\n", - " if is_audited:\n", - " insert_audited_data(output_table, insert_column, return_json, schema_name, con)\n", - " else:\n", - " insert_partitioned_data(output_table_with_date, insert_column, return_json, schema_name, con)\n", - " \n", - " counter += 1\n", - " keep_adding = find_limit(return_json)\n", - " if keep_adding == True:\n", - " max_number = max_number + record_max\n", - " else:\n", - " LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table)\n", - " \n", - " if is_audited:\n", - " successful_task_run = update_table(output_table, insert_column, excluded_column, primary_key, schema_name, con)" - ] - }, - { - "cell_type": "markdown", - "id": "9bc6dfd9-055a-4fc2-9345-3e1ec71e275e", - "metadata": {}, - "source": [ - "## Your inputs [Enter things here]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "07bf3cfb-8ed7-424b-a0d9-de4f938cd49a", - "metadata": {}, - "outputs": [], - "source": [ - "mapserver_n = # int\n", - "layer_id = # int\n", - "schema_name = '' # str\n", - "is_audited = True # bool\n", - "get_layer(mapserver_n, layer_id, schema_name, is_audited, con = connection_obj)" - ] - }, - { - "cell_type": "markdown", - "id": "9c4c2fa1-a7c8-4af1-acc6-6ac206e328c2", - "metadata": {}, - "source": [ - "Note that for partitioned tables, you might need to first create the parent table in your personal schema if that is where you want to pull. Please refer to the sql code for creating the parent tables in gis/gccview/sql folder." - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.7" - }, - "toc-autonumbering": false - }, - "nbformat": 4, - "nbformat_minor": 5 -}