Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support geo columns in test tables #3

Merged
merged 6 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 62 additions & 13 deletions carto_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from uuid import uuid4
import argparse
import base64
from shapely import wkt
import hashlib
import json
import os
Expand Down Expand Up @@ -359,7 +360,34 @@ def deploy(destination):


def _upload_test_table_bq(filename, component):
dataset_id = os.getenv("BQ_TEST_DATASET")
schema = []
with open(filename) as f:
data = [json.loads(l) for l in f.readlines()]
if os.path.exists(filename.replace(".ndjson", ".schema")):
with open(filename.replace(".ndjson", ".schema")) as f:
jsonschema = json.load(f)
for key, value in jsonschema.items():
schema.append(bigquery.SchemaField(key, value))
else:
for key, value in data[0].items():
if isinstance(value, int):
data_type = 'INT64'
elif isinstance(value, str):
try:
wkt.loads(value)
data_type = 'GEOGRAPHY'
except Exception as e:
data_type = 'STRING'
elif isinstance(value, float):
data_type = 'FLOAT64'
else:
try:
wkt.loads(value)
data_type = 'GEOGRAPHY'
except Exception as e:
data_type = 'STRING'
schema.append(bigquery.SchemaField(key, data_type))
dataset_id = os.getenv('BQ_TEST_DATASET')
table_id = f"_test_{component['name']}_{os.path.basename(filename).split('.')[0]}"

dataset_ref = bq_client().dataset(dataset_id)
Expand All @@ -368,6 +396,7 @@ def _upload_test_table_bq(filename, component):
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job_config.autodetect = True
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
job_config.schema=schema

with open(filename, "rb") as source_file:
job = bq_client().load_table_from_file(
Expand All @@ -384,25 +413,45 @@ def _upload_test_table_bq(filename, component):
def _upload_test_table_sf(filename, component):
with open(filename) as f:
data = [json.loads(l) for l in f.readlines()]
if os.path.exists(filename.replace(".ndjson", ".schema")):
with open(filename.replace(".ndjson", ".schema")) as f:
data_types = json.load(f)
else:
data_types = []
for key, value in data[0].items():
if isinstance(value, int):
data_types[key] = 'NUMBER'
elif isinstance(value, str):
try:
wkt.loads(value)
data_types[key] = 'GEOGRAPHY'
except Exception as e:
data_types[key] = 'VARCHAR'
elif isinstance(value, float):
data_types[key] = 'FLOAT'
else:
try:
wkt.loads(value)
data_types[key] = 'GEOGRAPHY'
except Exception as e:
data_types[key] = 'VARCHAR'
table_id = f"_test_{component['name']}_{os.path.basename(filename).split('.')[0]}"
create_table_sql = f"CREATE OR REPLACE TABLE {sf_workflows_temp}.{table_id} ("
for key, value in data[0].items():
if isinstance(value, int):
data_type = "NUMBER"
elif isinstance(value, str):
data_type = "VARCHAR"
elif isinstance(value, float):
data_type = "FLOAT"
else:
data_type = "VARCHAR"
create_table_sql += f"{key} {data_type}, "
create_table_sql = create_table_sql.rstrip(", ")
create_table_sql += ");\n"
create_table_sql += f'{key} {data_types[key]}, '
create_table_sql = create_table_sql.rstrip(', ')
create_table_sql += ');\n'
cursor = sf_client().cursor()
cursor.execute(create_table_sql)
for row in data:
insert_sql = f"INSERT INTO {sf_workflows_temp}.{table_id} ({', '.join(row.keys())}) VALUES ({', '.join(['%s'] * len(row))})"
cursor.execute(insert_sql, list(row.values()))
values = []
for key, value in row.items():
if data_types[key] == 'GEOGRAPHY':
values.append(f"ST_GEOGRAPHYFROMWKT('{value}')")
else:
values.append(value)
cursor.execute(insert_sql, values)
cursor.close()


Expand Down
39 changes: 21 additions & 18 deletions components/template/test/fixtures/1.json
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
{
"output_table": [
{
"name": "Bob",
"id": 2,
"fixed_value_col": "test"
},
{
"name": "Carol",
"id": 3,
"fixed_value_col": "test"
},
{
"name": "Alice",
"id": 1,
"fixed_value_col": "test"
}
]
{
"output_table": [
{
"id": 1,
"name": "Alice",
"geo": "POINT(1 1)",
"fixed_value_col": "test"
},
{
"id": 3,
"name": "Carol",
"geo": "POINT(1 1)",
"fixed_value_col": "test"
},
{
"id": 2,
"name": "Bob",
"geo": "POINT(1 1)",
"fixed_value_col": "test"
}
]
}
39 changes: 21 additions & 18 deletions components/template/test/fixtures/2.json
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
{
"output_table": [
{
"name": "Bob",
"id": 2,
"fixed_value_col": "test2"
},
{
"name": "Alice",
"id": 1,
"fixed_value_col": "test2"
},
{
"name": "Carol",
"id": 3,
"fixed_value_col": "test2"
}
]
{
"output_table": [
{
"id": 2,
"name": "Bob",
"geo": "POINT(1 1)",
"fixed_value_col": "test2"
},
{
"id": 1,
"name": "Alice",
"geo": "POINT(1 1)",
"fixed_value_col": "test2"
},
{
"id": 3,
"name": "Carol",
"geo": "POINT(1 1)",
"fixed_value_col": "test2"
}
]
}
6 changes: 3 additions & 3 deletions components/template/test/table1.ndjson
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"id":1,"name":"Alice"}
{"id":2,"name":"Bob"}
{"id":3,"name":"Carol"}
{"id":1,"name":"Alice", "geo": "POINT(1 1)"}
{"id":2,"name":"Bob", "geo": "POINT(1 1)"}
{"id":3,"name":"Carol", "geo": "POINT(1 1)"}
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
google-cloud-bigquery
snowflake-connector-python
python-dotenv
python-dotenv
shapely