From 3c3470904830b65830cd1a8d862b0e1de956db1a Mon Sep 17 00:00:00 2001 From: Leo Xu Date: Thu, 3 Oct 2024 20:31:51 +0000 Subject: [PATCH] #1028 Update comments for gcc puller functions and updae README.md --- gis/gccview/README.md | 37 ++++++++++++++---- gis/gccview/gcc_puller_functions.py | 60 +++++++++++++++++++---------- 2 files changed, 70 insertions(+), 27 deletions(-) diff --git a/gis/gccview/README.md b/gis/gccview/README.md index 6bc861e71..7fab04f06 100644 --- a/gis/gccview/README.md +++ b/gis/gccview/README.md @@ -80,15 +80,18 @@ The GCC pipeline will be pulling multiple layers into the `gis_core` and `gis` s ## Data Pipeline -The pipeline consists of two files, `gcc_puller_functions.py` for the functions and `/dags/gcc_layers_pull.py` for the Airflow DAG. The main function that fetches the layers is called `get_layer` and it takes in five parameters. Here is a list that describes what each parameter means: +The pipeline consists of two files, `gcc_puller_functions.py` for the functions and `/dags/gcc_layers_pull.py` for the Airflow DAG. The main function that fetches the layers is called `get_layer` and it takes in eight parameters. Here is a list that describes what each parameter means: - mapserver_n (int): ID of the mapserver that host the desired layer - layer_id (int): ID of the layer within the particular mapserver - schema_name (string): name of destination schema -- is_audited (Boolean): True if the layer will be in a table that is audited, False if the layer will be inserted as a child table part of a parent table +- is_audited (Boolean): True if the layer will be in a table that is audited, False if the layer will be non-audited - cred (Airflow PostgresHook): the Airflow PostgresHook that directs to credentials to enable a connection to a particular database +- con (used when manually pull): the path to the credential config file. Default is ~/db.cfg +- primary_key (used when pulling an audited table): primary key for this layer, returned from dictionary pk_dict when pulling for the Airflow DAG, set it manually when pulling a layer yourself. +- is_partitioned (Boolean): True if the layer will be inserted as a child table part of a parent table, False if the layer will be neither audited nor partitioned. -In the DAG file, the arguments for each layer are stored in dictionaries called "bigdata_layers" and "ptc_layers", in the order above. The DAG will be executed once every 3 months, particularly on the 15th of every March, June, September, and December every year. +In the DAG file, the arguments for each layer are stored in dictionaries called "bigdata_layers" and "ptc_layers", in the order above. The DAG will be executed once every 3 months, particularly on the 15th of every March, June, September, and December every year. The DAG will pull either audited table or partitioned table since the "is_partitioned" argument is not stored in dictionaries and are set to default value True. ## Adding new layers to GCC Puller DAG 1. Identify the mapserver_n and layer_id for the layer you wish to add. You can find COT transportation layers here: https://insideto-gis.toronto.ca/arcgis/rest/services/cot_geospatial2/FeatureServer, where mapserver_n is 2 and the layer_id is in brackets after the layer name. @@ -97,7 +100,7 @@ In the DAG file, the arguments for each layer are stored in dictionaries called ## Manually fetch layers -If you need to pull a layer as a one-off task, this script allows you to pull any layer from the GCC Rest API. Please note that the script must be run locally or on a on-prem server as it needs connection to insideto. +If you need to pull a layer as a one-off task, `gcc_puller_functions.py` allows you to pull any layer from the GCC Rest API. Please note that the script must be run locally or on a on-prem server as it needs connection to insideto. Before running the script, ensure that you have set up the appropriate environment with all necessary packages installed. You might have to set the `https_proxy` in your environment with your novell username and password in order to clone this repo or install packages. If you run into any issues, don't hestitate to ask a sysadmin. You can then install all packages in the `requirement.txt`, either with: 1) Activate your virtual environment, it should automatically install them for you @@ -120,7 +123,7 @@ Before running the script, ensure that you have set up the appropriate environme Now you are set to run the script! -There are 4 inputs that need to be entered. +There are 7 inputs that can be entered. `--mapserver`: Mapserver number, e.g. cotgeospatial_2 will be 2 @@ -130,9 +133,29 @@ There are 4 inputs that need to be entered. `--con`(optional): The path to the credential config file. Default is ~/db.cfg -Example of pulling the library layer to the gis schema. +`--is_audited`: Whether table will be audited or not, specify the option on the command line will set this option to True; while not specifying will give the default False. + +`primary_key`(required when pulling an audited table): Primary key for the layer + +`is_partitioned`: Whether table will be a child table of a parent table or with no feature, specify the option on the command line will set this option to True; while not specifying will give the default False. + +Example of pulling the library layer (table with no feature) to the gis schema. + + +```python +python3 bdit_data-sources/gis/gccview/gcc_puller_functions.py --mapserver 28 --layer-id 28 --schema-name gis --con db.cfg +``` + +Example of pulling the intersection layer (partitioned) to the gis_core schema. ```python -python gcc_layer_puller.py --mapserver 28 --layer-id 28 --schema-name gis --con db.cfg +python3 bdit_data-sources/gis/gccview/gcc_puller_functions.py --mapserver 12 --layer-id 42 --schema-name gis_core --con db.cfg --is-partitioned ``` + +Example of pulling the city_ward layer (partitioned) to the gis_core schema. + + +```python +python3 bdit_data-sources/gis/gccview/gcc_puller_functions.py --mapserver 0 --layer-id 0 --schema-name gis_core --con db.cfg --is-audited --primary-key area_id +``` \ No newline at end of file diff --git a/gis/gccview/gcc_puller_functions.py b/gis/gccview/gcc_puller_functions.py index e5dd365c9..8a4fa32c6 100644 --- a/gis/gccview/gcc_puller_functions.py +++ b/gis/gccview/gcc_puller_functions.py @@ -42,10 +42,10 @@ def get_tablename(mapserver, layer_id): Parameters ----------- - mapserver: string + mapserver : string The name of the mapserver we are accessing, returned from function mapserver_name - layer_id: integer + layer_id : integer Unique layer id that represent a single layer in the mapserver Returns @@ -103,7 +103,7 @@ def create_audited_table(output_table, return_json, schema_name, primary_key, co primary_key : string Primary key for this layer, returned from dictionary pk_dict - con: Airflow Connection + con : Airflow Connection Could be the connection to bigdata or to on-prem server Returns @@ -141,8 +141,8 @@ def create_audited_table(output_table, return_json, schema_name, primary_key, co LOGGER.info(create_sql.as_string(con)) cur.execute(create_sql) - # owner_sql = sql.SQL("ALTER TABLE IF EXISTS {schema_table} OWNER to gis_admins").format(schema_table = sql.Identifier(schema_name, temp_table_name)) - # cur.execute(owner_sql) + owner_sql = sql.SQL("ALTER TABLE IF EXISTS {schema_table} OWNER to gis_admins").format(schema_table = sql.Identifier(schema_name, temp_table_name)) + cur.execute(owner_sql) # Add a pk with con: @@ -166,7 +166,7 @@ def create_partitioned_table(output_table, return_json, schema_name, con): schema_name : string The schema in which the table will be inserted into - con: Airflow Connection + con : Airflow Connection Could be the connection to bigdata or to on-prem server Returns @@ -206,7 +206,7 @@ def create_partitioned_table(output_table, return_json, schema_name, con): def create_table(output_table, return_json, schema_name, con): """ - Function to create a new table in postgresql for the layer (for audited tables only) + Function to create a new table in postgresql for the layer (for regular table) Parameter --------- @@ -219,19 +219,13 @@ def create_table(output_table, return_json, schema_name, con): schema_name : string The schema in which the table will be inserted into - primary_key : string - Primary key for this layer, returned from dictionary pk_dict - - con: Connection + con : Connection Could be the connection to bigdata or to on-prem server Returns -------- insert_columm : SQL composed Composed object of column name and types use for creating a new postgresql table - - excluded_column : SQL composed - Composed object that is similar to insert_column, but has 'EXCLUDED.' attached before each column name, used for UPSERT query """ fields = return_json['fields'] @@ -391,6 +385,29 @@ def find_limit(return_json): return keep_adding def insert_data(output_table, insert_column, return_json, schema_name, con, is_audited, is_partitioned): + """ + Function to insert data to our postgresql database + Parameters + ---------- + output_table : string + Table name for postgresql, returned from function get_tablename + insert_column : SQL composed + Composed object of column name and types use for creating a new postgresql table + return_json : json + Resulted json response from calling the api, returned from function get_data + + schema_name : string + The schema in which the table will be inserted into + + con : Airflow Connection + Could be the connection to bigdata or to on-prem server + + is_audited : Boolean + Whether we want to have the table be audited (true) or be non-audited (false) + + is_partitioned : Boolean + Whether we want to have the table be partitioned (true) or neither audited nor partitioned(false) + """ rows = [] features = return_json['features'] fields = return_json['fields'] @@ -450,7 +467,7 @@ def update_table(output_table, insert_column, excluded_column, primary_key, sche schema_name : string The schema in which the table will be inserted into - con: Airflow Connection + con : Airflow Connection Could be the connection to bigdata or to on-prem server Returns @@ -526,7 +543,7 @@ def update_table(output_table, insert_column, excluded_column, primary_key, sche return successful_execution #------------------------------------------------------------------------------------------------------- # base main function, also compatible with Airflow -def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None, primary_key = None, is_partitioned = False): +def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None, primary_key = None, is_partitioned = True): """ This function calls to the GCCview rest API and inserts the outputs to the output table in the postgres database. @@ -541,16 +558,19 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = schema_name : string The schema in which the table will be inserted into - is_audited: Boolean - Whether we want to have the table be audited (true) or be partitioned (false) + is_audited : Boolean + Whether we want to have the table be audited (true) or be non-audited (false) - cred: Airflow PostgresHook + cred : Airflow PostgresHook Contains credentials to enable a connection to a database Expects a valid cred input when running Airflow DAG - con: connection to database + con : connection to database Connection object that can connect to a particular database Expects a valid con object if using command prompt + + is_partitioned : Boolean + Whether we want to have the table be partitioned (true) or neither audited nor partitioned(false) """ # For Airflow DAG