Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gcc puller generic #1044

Merged
merged 14 commits into from
Oct 7, 2024
Merged

Gcc puller generic #1044

merged 14 commits into from
Oct 7, 2024

Conversation

chmnata
Copy link
Collaborator

@chmnata chmnata commented Aug 23, 2024

What this pull request accomplishes:

  • Update gcc_layer_puller.py to pull any layer on the gcc rest api without having to choose between partition table/audit table.

Issue(s) this solves:

What, in particular, needs to reviewed:

  • if the readme is detailed enough, if the reviewer can try to test pull this with requirement.txt packages it would be greaattt.
  • also updated the gcc_puller_functions

What needs to be done by a sysadmin after this PR is merged

Have to test again on morbius before merging.

@chmnata chmnata requested a review from gabrielwol August 23, 2024 18:17
Copy link
Collaborator

@gabrielwol gabrielwol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice new feature to add! Being harsh on you and requesting some more de-duplication between the _puller and _functions versions 🥺

raise AirflowFailException
'''
try:
successful_task_run = update_table(output_table, insert_column, excluded_column, primary_key, schema_name, con)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

successful_task_run is not used, can delete reference here and above.

gis/gccview/gcc_puller_functions.py Show resolved Hide resolved

The pipeline consists of two files, `gcc_puller_functions.py` for the functions and `gcc_layers_pull.py` for the Airflow DAG. The main function that fetches the layers is called `get_layer` and it takes in five parameters. Here is a list that describes what each parameter means:
The pipeline consists of two files, `gcc_puller_functions.py` for the functions and `/dags/gcc_layers_pull.py` for the Airflow DAG. The main function that fetches the layers is called `get_layer` and it takes in five parameters. Here is a list that describes what each parameter means:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad for not updating it in #1032: can you add primary_key to the list of params below? and also line 91 is outdated.


return insert_column

def get_data(mapserver, layer_id, max_number = None, record_max = None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this identical to gcc_layer_functions.py get_layers minus the centreline exception? Think you can combine them?

LOGGER.info('Successfully inserted %d records into %s', len(rows), output_table)


def get_layer(mapserver_n, layer_id, schema_name, con = None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse get_layer from gcc_layer_functions.py with some of the params fixed using partial?

@gabrielwol
Copy link
Collaborator

Also closes #561 right?


Note that if the script doesn't work, one reason might be because your credentials don't have access to the GCC API.
```python
python gcc_layer_puller.py --mapserver 28 --layer-id 28 --schema-name gis --con db.cfg
Copy link
Collaborator

@gabrielwol gabrielwol Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update this example, eg.

python3 bdit_data-sources/gis/gccview/gcc_puller_functions.py --mapserver 28 --layer-id 28 --schema-name gwolofs --con db.cfg

Copy link
Collaborator

@gabrielwol gabrielwol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking this on Leo! Think there's still some refactoring to be done with the duplicate code and also the readme needs improvements please.


while keep_adding == True:
#--------------------------------
if is_manual:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The click command allows users to specify is-audited and primary-key, but this control structure makes manual pulls mutually exclusive with those features. Can we go back to allowing all times of pulls with use of cli?

while keep_adding == True:
if counter == 0:
# Without specifying the max_number and record_max, the number of data we get from this query
# should be the max record count of the layer if the layer is huge
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note query will default to max records of 2000

if not(is_audited) and primary_key is not None:
LOGGER.error("Non-audited tables do not use the primary key.")
#--------------------------------
while keep_adding == True:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This while loop is very similar to above, I think it can be combined for better readability

@@ -451,6 +498,58 @@ def insert_partitioned_data(schema_parent_table_insert, insert_column, return_js
with con.cursor() as cur:
execute_values(cur, insert, rows)
LOGGER.info('Successfully inserted %d records into %s', len(rows), schema_parent_table_insert)

def insert_data(output_table, insert_column, return_json, schema_name, con):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to be picky and ask you to combine the 3 insert_data functions. They each have just 1-2 lines different.

2. Add a new entry to "bigdata_layers" or "ptc_layers" dictionaries in airflow's variable depending on the destination database.
3. If is_audited = True, you must also add a primary key for the new layer to "gcc_layers" in the corresponding airflow variable.

## Manually fetch layers
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a description of the 3 options: audited, partitioned, ... none? Each have different input requirements and audited/partitioned require target table to exist in database already I think.

Copy link
Collaborator

@gabrielwol gabrielwol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully tested manual tables and partitioned tables on Morbius.

Audited tables have a bug which @leo-oxu will look into it!

[2024-10-04, 13:34:33 EDT] {gcc_puller_functions.py:138} INFO - "area_id" numeric,"date_effective" timestamp without time zone,"date_expiry" timestamp without time zone,"area_attr_id" numeric,"area_type_id" numeric,"parent_area_id" numeric,"area_type" text,"area_class_id" numeric,"area_class" text,"area_short_code" text,"area_long_code" text,"area_name" text,"area_desc" text,"feature_code" integer,"feature_code_desc" text,"trans_id_create" numeric,"trans_id_expire" numeric,"x" numeric,"y" numeric,"longitude" numeric,"latitude" numeric,"objectid" integer,"shape_area" numeric,"shape_len" numeric,"geom" geometry
[2024-10-04, 13:34:33 EDT] {gcc_puller_functions.py:141} INFO - CREATE TABLE IF NOT EXISTS "gis"."_city_ward" ("area_id" numeric,"date_effective" timestamp without time zone,"date_expiry" timestamp without time zone,"area_attr_id" numeric,"area_type_id" numeric,"parent_area_id" numeric,"area_type" text,"area_class_id" numeric,"area_class" text,"area_short_code" text,"area_long_code" text,"area_name" text,"area_desc" text,"feature_code" integer,"feature_code_desc" text,"trans_id_create" numeric,"trans_id_expire" numeric,"x" numeric,"y" numeric,"longitude" numeric,"latitude" numeric,"objectid" integer,"shape_area" numeric,"shape_len" numeric,"geom" geometry)
[2024-10-04, 13:34:33 EDT] {taskinstance.py:3301} ERROR - Task failed with exception
Traceback (most recent call last):
...
  File "/home/airflow/airflow/dags/gcc_layers_pull.py", line 46, in pull_layer
    get_layer(
  File "/data/home/gwolofs/bdit_data-sources/gis/gccview/gcc_puller_functions.py", line 614, in get_layer
    insert_data(output_table, insert_column, return_json, schema_name, con, is_audited, is_partitioned)
  File "/data/home/gwolofs/bdit_data-sources/gis/gccview/gcc_puller_functions.py", line 445, in insert_data
    execute_values(cur, insert, rows)
  File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/psycopg2/extras.py", line 1299, in execute_values
    cur.execute(b''.join(parts))
psycopg2.errors.SyntaxError: INSERT has more expressions than target columns
LINE 1: ...L,NULL, -79.5302719567,43.7507242813,17344785,0,0,'SRID=4326...

Copy link
Collaborator

@gabrielwol gabrielwol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Thanks @chmnata for initiating and thanks Leo for bearing with all my comments.
Tested to be working via Airflow on Morbius and for pulling one-off layers locally.

@gabrielwol gabrielwol merged commit 1002f08 into master Oct 7, 2024
5 checks passed
gabrielwol pushed a commit that referenced this pull request Oct 7, 2024
@gabrielwol gabrielwol deleted the gcc_puller_generic branch October 7, 2024 16:14
gabrielwol pushed a commit that referenced this pull request Oct 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

GCC Puller: Add a generic function to import any layers on GCC to one's schema
3 participants