diff --git a/gis/gccview/gcc_puller_functions.py b/gis/gccview/gcc_puller_functions.py index 38d1698f7..83480a858 100644 --- a/gis/gccview/gcc_puller_functions.py +++ b/gis/gccview/gcc_puller_functions.py @@ -6,8 +6,6 @@ from psycopg2 import sql from psycopg2.extras import execute_values import logging -from time import sleep -from airflow.exceptions import AirflowFailException import click CONFIG = configparser.ConfigParser() @@ -311,7 +309,6 @@ def get_data(mapserver, layer_id, max_number = None, record_max = None): LOGGER.error("Invalid HTTP response: ", err_h) except requests.exceptions.ConnectionError as err_c: LOGGER.error("Network problem: ", err_c) - sleep(10) except requests.exceptions.Timeout as err_t: LOGGER.error("Timeout: ", err_t) except requests.exceptions.RequestException as err: @@ -613,38 +610,25 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = features = return_json['features'] record_max=(len(features)) max_number = record_max - - if is_audited: - insert_audited_data(output_table, insert_column, return_json, schema_name, con) - else: - insert_partitioned_data(schema_parent_table_insert, insert_column, return_json, schema_name, con) - - counter += 1 - keep_adding = find_limit(return_json) - if keep_adding == False: - LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) + # Insert data into the table + if is_audited: + insert_audited_data(output_table, insert_column, return_json, schema_name, con) else: - return_json = get_data(mapserver, layer_id, max_number = max_number, record_max = record_max) - if is_audited: - insert_audited_data(output_table, insert_column, return_json, schema_name, con) - else: - insert_partitioned_data(schema_parent_table_insert, insert_column, return_json, schema_name, con) - - counter += 1 - keep_adding = find_limit(return_json) - if keep_adding == True: - max_number = max_number + record_max - else: - LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) + insert_partitioned_data(schema_parent_table_insert, insert_column, return_json, schema_name, con) + # Update loop variables + counter += 1 + keep_adding = find_limit(return_json) + + if keep_adding: + max_number += record_max + else: + LOGGER.info('All records from [mapserver: %s, layerID: %d] have been inserted into %s', mapserver, layer_id, output_table) if is_audited: - successful_task_run = update_table(output_table, insert_column, excluded_column, primary_key, schema_name, con) - - ''' - # Raise error if UPSERT failed - if not successful_task_run: - raise AirflowFailException - ''' + try: + successful_task_run = update_table(output_table, insert_column, excluded_column, primary_key, schema_name, con) + except Exception as err: + LOGGER.exception("Unable to update table %s", err) @click.command() @click.option('--mapserver', '-ms', type = int, required = True,