From 3b17593d673d2bd2a5c488740d91562aef75e145 Mon Sep 17 00:00:00 2001 From: Franco Reyes <79299724+francojreyes@users.noreply.github.com> Date: Tue, 26 Mar 2024 21:26:13 +1100 Subject: [PATCH] use upsert logic for insertion (#9) * Use upsert and move hasura helpers * Update docs to reflect upsert * Support running actions after insertion * Fixed bugs --- README.md | 28 ++++--- app/helpers/hasura.py | 124 +++++++++++++++++++++++++++ app/main.py | 191 +++++++++++++----------------------------- 3 files changed, 199 insertions(+), 144 deletions(-) create mode 100644 app/helpers/hasura.py diff --git a/README.md b/README.md index 54a8313..e7e3939 100644 --- a/README.md +++ b/README.md @@ -161,22 +161,26 @@ The scrape job should produce JSON output and send a HTTP POST request to `http: Inserts data into the PostgreSQL table specified by `table_name`. If such a table does not yet exist, it is created as specified in `sql_up`. +Insertion into Hasuragres uses 'upsertion' logic. This means that when inserting rows, if there is already an existing row with the same primary key, we update that row rather than raising an error. This approach is also used if `write_mode` is set to `"overwrite"` so that rows still present after overwriting don't get deleted and cascade down to any dependent tables. + Hasuragres keeps track of the SQL scripts used to create each table. If there is an inconsistency between the stored `sql_up` script and the one provided in the request, then the table is re-created using the new `sql_up`. The stored `sql_down` is used to drop the old table - it's important that `sql_down` is correct, otherwise updates to the table structure may fail. When a table is created, it is automatically tracked in Hasura and added to the GraphQL Schema. Corresponding query fields are created called `` and `_by_pk` (note that `table_name` will be in lowercase), with fields for each column of the table. Furthermore, any foreign key relationships are inferred, and fields containing nested objects are added to each relevant queryable data type. More information can be found [here](https://hasura.io/docs/latest/getting-started/how-it-works/index/). #### Parameters -| name | type | required | description | -|------------------------|--------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `metadata` | object | Yes | Instructions for creating/inserting into PostgreSQL tables. | -| `metadata.table_name` | str | Yes | Name of table to create/insert into.

Must match name of table created in `metadata.sql_up` (case insensitive). | -| `metadata.columns` | list[str] | Yes | List of column names that require insertion.

Must match column names in table created in `metadata.sql_up`, as well as the keys of each object in `payload` (case sensitive). | -| `metadata.write_mode` | str | No | One of `"truncate"`, meaning all rows in the table should be deleted before insertion, or `"append"`, meaning add to the existing data.

Defaults to `truncate`. | -| `metadata.sql_execute` | str | No | SQL commands to run *before* each insertion. | -| `metadata.sql_up` | str | Yes | SQL commands used to set UP (create) a table to store the scraped data, as well as any related data types. | -| `metadata.sql_down` | str | Yes | SQL commands to tear DOWN (drop) all objects created by `metadata.sql_up`.

Should use the CASCADE option when dropping, otherwise the script may fail unexpectedly when other tables rely on this one. | -| `payload` | list[object] | Yes | List of objects to insert into the database.

Ideally, this is simply the JSON output of the scraper. | +| name | type | required | description | +|-----------------------|--------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `metadata` | object | Yes | Instructions for creating/inserting into PostgreSQL tables. | +| `metadata.table_name` | str | Yes | Name of table to create/insert into.

Must match name of table created in `metadata.sql_up` (case insensitive). | +| `metadata.columns` | list[str] | Yes | List of column names that require insertion.

Must match column names in table created in `metadata.sql_up`, as well as the keys of each object in `payload` (case sensitive). | +| `metadata.write_mode` | str | No | One of `"overwrite"` or `"append"`.

Defaults to `"overwrite"`. | +| `metadata.sql_before` | str | No | SQL command to run *before* the insertion. | +| `metadata.sql_after` | str | No | SQL command to run *after* the insertion. | +| `metadata.sql_up` | str | Yes | SQL commands used to set UP (create) a table to store the scraped data, as well as any related data types. | +| `metadata.sql_down` | str | Yes | SQL commands to tear DOWN (drop) all objects created by `metadata.sql_up`.

Should use the CASCADE option when dropping, otherwise the script may fail unexpectedly when other tables rely on this one. | +| `payload` | list[object] | Yes | List of objects to insert into the database.

Ideally, this is simply the JSON output of the scraper. | + #### Example Request @@ -205,9 +209,9 @@ If you want to connect multiple scrapers to the same table, for example if you h Both scrapers should maintain an up-to-date copy of the `sql_up` and `sql_down` commands sent to Hasuragres. Furthermore, if you need to update these commands, please be sure to update all scrapers around the same time without much delay between each. If at any point the scrapers have different versions of the SQL, then any inserts will simply drop the table and all data from the other scraper(s). -It is also important that you make use of the `sql_execute` and `write_mode` fields of the insert metadata. By default, inserts are set to truncate the table they insert to, which would only allow data from one scraper at any one time. For multiple scrapers, they should each be in `"append"` mode so that scrapers can add on to the data from other scrapers. +It is also important that you make use of the `sql_before` and `write_mode` fields of the insert metadata. By default, inserts are set to truncate the table they insert to, which would only allow data from one scraper at any one time. For multiple scrapers, they should each be in `"append"` mode so that scrapers can add on to the data from other scrapers. -Also, `sql_execute` should contain commands(s) to remove only those rows that were previously inserted by the scraper - it may be useful to add some field to the schema that identifies the source of each row if there is no easy way to distinguish between the data sources. +Also, `sql_before` should contain commands(s) to remove only those rows that were previously inserted by the scraper - it may be useful to add some field to the schema that identifies the source of each row if there is no easy way to distinguish between the data sources. ## Testing Scrapers diff --git a/app/helpers/hasura.py b/app/helpers/hasura.py new file mode 100644 index 0000000..4857a9b --- /dev/null +++ b/app/helpers/hasura.py @@ -0,0 +1,124 @@ +import os +import requests + +from dotenv import load_dotenv + +# Ensure HASURA_GRAPHQL_ env vars are set +load_dotenv() +HGQLA_SECRET = os.environ.get("HASURA_GRAPHQL_ADMIN_SECRET") +if not HGQLA_SECRET: + print("HASURA_GRAPHQL_ADMIN_SECRET not set") + exit(1) + +HGQL_HOST = os.environ.get('HASURA_GRAPHQL_HOST') +if not HGQL_HOST: + print("HASURA_GRAPHQL_HOST not set") + exit(1) + +HGQL_PORT = os.environ.get('HASURA_GRAPHQL_PORT') +if not HGQL_PORT: + print("HASURA_GRAPHQL_PORT not set") + exit(1) + + +def send_hasura_api_query(query: dict): + return requests.post( + f"http://{HGQL_HOST}:{HGQL_PORT}/v1/metadata", + headers={ + "X-Hasura-Admin-Secret": HGQLA_SECRET + }, + json=query + ) + + +# The below functions are used to adhere to Hasura's relationship nomenclature +# https://hasura.io/docs/latest/schema/postgres/using-existing-database/ +# Possibly use the `inflect` module if they aren't sufficient +def plural(s: str) -> str: + return s if s.endswith("s") else s + "s" + + +def singular(s: str) -> str: + return s if not s.endswith("s") else s[:-1] + + +def infer_relationships(table_name: str) -> list[object]: + """ + Use pg_suggest_relationships to infer any relations from foreign keys + in the given table. Returns an array containing queries to track each + relationship. + + See https://hasura.io/docs/latest/api-reference/metadata-api/relationship/ + """ + res = send_hasura_api_query({ + "type": "pg_suggest_relationships", + "version": 1, + "args": { + "omit_tracked": True, + "tables": [table_name] + } + }) + + queries = [] + for rel in res.json()["relationships"]: + if rel["type"] == "object": + queries.append({ + "type": "pg_create_object_relationship", + "args": { + "source": "default", + "table": rel["from"]["table"]["name"], + "name": singular(rel["to"]["table"]["name"]), + "using": { + "foreign_key_constraint_on": rel["from"]["columns"] + } + } + }) + elif rel["type"] == "array": + queries.append({ + "type": "pg_create_array_relationship", + "args": { + "source": "default", + "table": rel["from"]["table"]["name"], + "name": plural(rel["to"]["table"]["name"]), + "using": { + "foreign_key_constraint_on": { + "table": rel["to"]["table"]["name"], + "columns": rel["to"]["columns"] + } + } + } + }) + + return queries + + +def track_table(table_name: str): + send_hasura_api_query({ + "type": "pg_track_table", + "args": { + "source": "default", + "schema": "public", + "name": table_name + } + }) + + # Allow anonymous access + send_hasura_api_query({ + "type": "pg_create_select_permission", + "args": { + "source": "default", + "table": table_name, + "role": "anonymous", + "permission": { + "columns": "*", + "filter": {}, + "allow_aggregations": True + } + } + }) + + # Track relationships + send_hasura_api_query({ + "type": "bulk", + "args": infer_relationships(table_name) + }) diff --git a/app/main.py b/app/main.py index 4cfb0c9..445f9fe 100644 --- a/app/main.py +++ b/app/main.py @@ -2,52 +2,35 @@ from typing import Any, Literal, Optional import psycopg2 -import requests import uvicorn -from dotenv import load_dotenv from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from psycopg2 import Error -from psycopg2.extras import execute_values from psycopg2.extensions import connection, cursor from pydantic import BaseModel, Field -# Ensure HASURA_GRAPHQL_ env vars are set -load_dotenv() -HGQLA_SECRET = os.environ.get("HASURA_GRAPHQL_ADMIN_SECRET") -if not HGQLA_SECRET: - print("HASURA_GRAPHQL_ADMIN_SECRET not set") - exit(1) - -HGQL_HOST = os.environ.get('HASURA_GRAPHQL_HOST') -if not HGQL_HOST: - print("HASURA_GRAPHQL_HOST not set") - exit(1) - -HGQL_PORT = os.environ.get('HASURA_GRAPHQL_PORT') -if not HGQL_PORT: - print("HASURA_GRAPHQL_PORT not set") - exit(1) +from helpers.hasura import track_table class Metadata(BaseModel): table_name: str - sql_execute: Optional[str] = Field(None, description='command to execute before running anything else') + sql_before: Optional[str] = Field(None, description='command to execute before running the insert') + sql_after: Optional[str] = Field(None, description='command to execute after running the insert') sql_up: str # SQL to set UP table and related data types/indexes sql_down: str # SQL to tear DOWN a table (should be the opp. of up) columns: list[str] # list of column names that require insertion - write_mode: Optional[Literal['append', 'truncate']] = Field('truncate', description='mode in which to write to the database') + write_mode: Literal['append', 'overwrite'] = Field('overwrite', description='mode in which to write to the database') -conn = None -cur = None +conn: connection = None +cur: cursor = None try: conn = psycopg2.connect(user=os.environ.get('POSTGRES_USER'), - password=os.environ.get('POSTGRES_PASSWORD'), - host=os.environ.get('POSTGRES_HOST'), - port=os.environ.get('POSTGRES_PORT'), - database=os.environ.get('POSTGRES_DB')) + password=os.environ.get('POSTGRES_PASSWORD'), + host=os.environ.get('POSTGRES_HOST'), + port=os.environ.get('POSTGRES_PORT'), + database=os.environ.get('POSTGRES_DB')) cur = conn.cursor() app = FastAPI() @@ -109,75 +92,51 @@ def create_table(metadata: Metadata) -> bool: return False -def send_hasura_api_query(query: dict): - return requests.post( - f"http://{HGQL_HOST}:{HGQL_PORT}/v1/metadata", - headers={ - "X-Hasura-Admin-Secret": HGQLA_SECRET - }, - json=query - ) +def get_primary_key_columns(table_name: str) -> list[str]: + cmd = f""" + SELECT c.column_name + FROM information_schema.columns c + JOIN information_schema.key_column_usage kcu + ON c.table_name = kcu.table_name + AND c.column_name = kcu.column_name + JOIN information_schema.table_constraints tc + ON kcu.table_name = tc.table_name + AND kcu.constraint_name = tc.constraint_name + WHERE c.table_name = '{table_name}' + AND tc.constraint_type = 'PRIMARY KEY'; + """ + cur.execute(cmd) + return [row[0] for row in cur.fetchall()] -# The below functions are used to adhere to Hasura's relationship nomenclature -# https://hasura.io/docs/latest/schema/postgres/using-existing-database/ -# Possibly use the `inflect` module if they aren't sufficient -def plural(s: str) -> str: - return s if s.endswith("s") else s + "s" +def execute_upsert(metadata: Metadata, payload: list[Any]): + columns = [f'"{col}"' for col in metadata.columns] + key_columns = [f'"{col}"' for col in get_primary_key_columns(metadata.table_name)] + non_key_columns = [col for col in columns if col not in key_columns] -def singular(s: str) -> str: - return s if not s.endswith("s") else s[:-1] + cmd = f""" + INSERT INTO {metadata.table_name}({", ".join(columns)}) + VALUES ({", ".join(["%s"] * len(columns))}) + ON CONFLICT ({", ".join(key_columns)}) + DO UPDATE SET {", ".join(f"{col} = EXCLUDED.{col}" for col in non_key_columns)}; + """ + values = [tuple(row[col] for col in metadata.columns) for row in payload] + cur.executemany(cmd, values) -def infer_relationships(table_name: str) -> list[object]: - """ - Use pg_suggest_relationships to infer any relations from foreign keys - in the given table. Returns an array containing queries to track each - relationship. - See https://hasura.io/docs/latest/api-reference/metadata-api/relationship/ +def execute_delete(metadata: Metadata, payload: list[Any]): + key_columns = get_primary_key_columns(metadata.table_name) + quoted_key_columns = [f'"{col}"' for col in key_columns] + + cmd = f""" + DELETE FROM {metadata.table_name} + WHERE ({", ".join(quoted_key_columns)}) NOT IN %s; """ - res = send_hasura_api_query({ - "type": "pg_suggest_relationships", - "version": 1, - "args": { - "omit_tracked": True, - "tables": [table_name] - } - }) - - queries = [] - for rel in res.json()["relationships"]: - if rel["type"] == "object": - queries.append({ - "type": "pg_create_object_relationship", - "args": { - "source": "default", - "table": rel["from"]["table"]["name"], - "name": singular(rel["to"]["table"]["name"]), - "using": { - "foreign_key_constraint_on": rel["from"]["columns"] - } - } - }) - elif rel["type"] == "array": - queries.append({ - "type": "pg_create_array_relationship", - "args": { - "source": "default", - "table": rel["from"]["table"]["name"], - "name": plural(rel["to"]["table"]["name"]), - "using": { - "foreign_key_constraint_on": { - "table": rel["to"]["table"]["name"], - "columns": rel["to"]["columns"] - } - } - } - }) - - return queries + values = tuple(tuple(row[col] for col in key_columns) for row in payload) + + cur.execute(cmd, (values,)) @app.post("/insert") @@ -191,19 +150,16 @@ def insert(metadata: Metadata, payload: list[Any]): raise HTTPException(status_code=400, detail=err_msg) try: - # execute whatever SQL is required - if metadata.sql_execute: - cur.execute(metadata.sql_execute) - if metadata.write_mode == 'truncate': - # Remove old data - cmd = f'TRUNCATE {metadata.table_name} CASCADE' - cur.execute(cmd) - - # Insert new data - values = [tuple(row[col] for col in metadata.columns) for row in payload] - metadata.columns = [f'"{col}"' for col in metadata.columns] - cmd = f'INSERT INTO {metadata.table_name}({", ".join(metadata.columns)}) VALUES %s' - execute_values(cur, cmd, values) + if metadata.sql_before: + cur.execute(metadata.sql_before) + + execute_upsert(metadata, payload) + if metadata.write_mode == 'overwrite': + # Delete rows not in payload + execute_delete(metadata, payload) + + if metadata.sql_after: + cur.execute(metadata.sql_after) except (Exception, Error) as error: err_msg = "Error while inserting into PostgreSQL table: " + str(error) print(err_msg) @@ -212,38 +168,9 @@ def insert(metadata: Metadata, payload: list[Any]): conn.commit() - # Run Hasura actions - must be done after transaction committed + # Run Hasura actions - must be done after transaction committed otherwise Hasura won't see the table if created: - # Track table - send_hasura_api_query({ - "type": "pg_track_table", - "args": { - "source": "default", - "schema": "public", - "name": metadata.table_name.lower() - } - }) - - # Allow anonymous access - send_hasura_api_query({ - "type": "pg_create_select_permission", - "args": { - "source": "default", - "table": metadata.table_name.lower(), - "role": "anonymous", - "permission": { - "columns": "*", - "filter": {}, - "allow_aggregations": True - } - } - }) - - # Track relationships - send_hasura_api_query({ - "type": "bulk", - "args": infer_relationships(metadata.table_name.lower()) - }) + track_table(metadata.table_name.lower()) return {}