Skip to content

Commit

Permalink
Merge pull request #22 from MikeMoore63/feat/tbl_routines
Browse files Browse the repository at this point in the history
feat: add support for table functions and complex paramters and types when falling back on sq approach to snc routines
  • Loading branch information
mike-m-hsbc authored Aug 25, 2024
2 parents ee2f8ca + a83b9d2 commit 200b85c
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 33 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ venv/
ENV/
env.bak/
venv.bak/
venv-*

# Spyder project settings
.spyderproject
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
1.0.7 Aug 25 2024
- Add support for table functions
- Add support for complex arguments and return types for routines when have to drop back to SQL mode
- Drop boto as dependency as not supported in python 3.12

1.0.6 Jan 1st 2024
- Move to pyproject.toml from setup.py to make package more modern
- Updates and improve testing and add tox actions on pull requests
Expand Down
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "bqtools-json" # Required
#
# For a discussion on single-sourcing the version, see
# https://packaging.python.org/guides/single-sourcing-package-version/
version = "1.0.6" # Required
version = "1.0.7" # Required

# This is a one-line description or tagline of what your project does. This
# corresponds to the "Summary" metadata field:
Expand Down Expand Up @@ -90,7 +90,6 @@ dependencies = [ # Optional
"google-cloud-storage>=1.0.0,<3.0",
"google-cloud-logging>=3.0,<4.0",
"absl-py>=1,<3",
"boto<3.0",
"google-api-python-client>=2.0.0,<3.0",
"grpcio~=1.0"
]
Expand Down Expand Up @@ -138,6 +137,9 @@ bqsync = "bqtools:bqsync"
[tool.black]
line-length = 88

[tool.check-manifest]
ignore = ["renovate.json"]

[build-system]
# These are the assumed default build requirements from pip:
# https://pip.pypa.io/en/stable/reference/pip/#pep-517-and-518-support
Expand Down
83 changes: 63 additions & 20 deletions src/bqtools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1288,11 +1288,11 @@ def add_data_check(SCHEMA, prefix=None, depth=0):
):
# add the unnestof repeated base type can use own field name
fieldname = "{}{}".format(aliasdict["alias"], field.name)
aliasdict[
"extrajoinpredicates"
] = """{}
aliasdict["extrajoinpredicates"] = (
"""{}
LEFT JOIN UNNEST(`{}`) AS `{}`""".format(
aliasdict["extrajoinpredicates"], field.name, fieldname
aliasdict["extrajoinpredicates"], field.name, fieldname
)
)
if field.field_type == "STRING":
expression_list.append("IFNULL(`{0}`,'')".format(fieldname))
Expand Down Expand Up @@ -1342,13 +1342,13 @@ def add_data_check(SCHEMA, prefix=None, depth=0):
newprefix.append(aliasdict["alias"])

# add the unnest
aliasdict[
"extrajoinpredicates"
] = """{}
aliasdict["extrajoinpredicates"] = (
"""{}
LEFT JOIN UNNEST(`{}`) AS {}""".format(
aliasdict["extrajoinpredicates"],
"`.`".join(prefix),
aliasdict["alias"],
aliasdict["extrajoinpredicates"],
"`.`".join(prefix),
aliasdict["alias"],
)
)

# add the fields
Expand Down Expand Up @@ -1377,11 +1377,11 @@ def add_data_check(SCHEMA, prefix=None, depth=0):

predicates = self.comparison_predicates(table.table_id, retpartition_time)
if len(predicates) > 0:
aliasdict[
"extrajoinpredicates"
] = """{}
aliasdict["extrajoinpredicates"] = (
"""{}
WHERE ({})""".format(
aliasdict["extrajoinpredicates"], ") AND (".join(predicates)
aliasdict["extrajoinpredicates"], ") AND (".join(predicates)
)
)

return retpartition_time, default, aliasdict["extrajoinpredicates"]
Expand Down Expand Up @@ -5791,6 +5791,42 @@ def patch_destination_routine(copy_driver, routine_name, routine_input):
create_destination_routine(copy_driver, routine_name, routine_input)


def create_sql_arg_type(arg):
return f"`{arg.name}`" + " " + create_sql_type(arg.data_type)


def create_sql_struct_type(arg):
return f"`{arg.name}`" + " " + create_sql_type(arg.type)


def create_sql_type(data_type):
if data_type.type_kind not in [
bigquery.enums.StandardSqlTypeNames.ARRAY,
bigquery.enums.StandardSqlTypeNames.STRUCT,
]:
return data_type.type_kind
if data_type.type_kind == bigquery.enums.StandardSqlTypeNames.STRUCT:
retval = (
data_type.type_kind
+ "<"
+ ",".join(
[
create_sql_struct_type(field)
for field in data_type.struct_type.fields
]
)
+ ">"
)
else:
retval = (
data_type.type_kind
+ "<"
+ create_sql_type(data_type.array_element_type)
+ ">"
)
return retval


def create_destination_routine(copy_driver, routine_name, routine_input):
"""
Create a routine based on source routine
Expand Down Expand Up @@ -5832,22 +5868,29 @@ def create_destination_routine(copy_driver, routine_name, routine_input):
f"Unable to create routine {routine_name} in {copy_driver.destination_project}.{copy_driver.destination_dataset} definition {routine_input['routine_definition']}"
)
if (
dstroutine_ref.type_ == "SCALAR_FUNCTION"
dstroutine_ref.type_ in ["SCALAR_FUNCTION", "TABLE_VALUED_FUNCTION"]
and dstroutine_ref.language == "SQL"
):
copy_driver.get_logger().info(
"As scalar function and SQL attempting adding as query"
"As scalar/table function and SQL attempting adding as query"
)
function_as_query = f"""CREATE OR REPLACE FUNCTION `{dstroutine_ref.project}.{dstroutine_ref.dataset_id}.{dstroutine_ref.routine_id}` ({",".join([arg.name + " " + arg.data_type.type_kind for arg in dstroutine_ref.arguments])}) AS
type = ""
if dstroutine_ref.type_ == "TABLE_VALUED_FUNCTION":
type = "TABLE"
language = ""
if dstroutine_ref.language == "JS":
language = "\nLANGUAGE js\n"
function_as_query = f"""CREATE OR REPLACE {type} FUNCTION `{dstroutine_ref.project}.{dstroutine_ref.dataset_id}.{dstroutine_ref.routine_id}` ({",".join([create_sql_arg_type(arg) for arg in dstroutine_ref.arguments])}) {language}
{"RETURNS " + create_sql_type(dstroutine_ref.return_type) if dstroutine_ref.return_type else ""}
OPTIONS (description="{dstroutine_ref.description if dstroutine_ref.description else ""}") AS
({dstroutine_ref.body})
{"RETURNS " + dstroutine_ref.return_type.type_kind if dstroutine_ref.return_type else ""}
OPTIONS (description="{dstroutine_ref.description if dstroutine_ref.description else ""}")"""
"""
try:
for result in run_query(
copy_driver.query_client,
function_as_query,
copy_driver.get_logger(),
"Apply SQL scalar function",
"Apply SQL scalar/table function",
location=copy_driver.destination_location,
callback_on_complete=copy_driver.update_job_stats,
labels=BQSYNCQUERYLABELS,
Expand Down
23 changes: 12 additions & 11 deletions src/bqtools/bqsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import os
import sys

import boto
from urllib.parse import urlparse
import google.cloud.logging
from absl import app
from absl import flags
Expand Down Expand Up @@ -255,26 +255,27 @@ def main(argv):
exit(-255)

# deal with if we have proxies, intercept certificates etc
ca_certificates_file = boto.config.get("Boto", "ca_certificates_file", "system")
url = urlparse(os.environ.get("https_proxy", os.environ.get("HTTPS_PROXY", None)))

ca_certificates_file = os.environ.get("REQUESTS_CA_BUNDLE", "system")
if ca_certificates_file != "system":
os.environ["REQUESTS_CA_BUNDLE"] = ca_certificates_file

proxy_user = boto.config.get("Boto", "proxy_user", None)
proxy_host = boto.config.get("Boto", "proxy", None)
ca_certificates_file = None
proxy_user = url.username
proxy_host = url.hostname
proxy = ""
if proxy_user is not None:
proxy = "http://{}:{}@{}:{}".format(
boto.config.get("Boto", "proxy_user", None),
boto.config.get("Boto", "proxy_pass", None),
boto.config.get("Boto", "proxy", None),
boto.config.getint("Boto", "proxy_port", 0),
url.username,
url.password,
url.hostname,
url.port,
)
else:
if proxy_host is not None:
proxy = "http://{}:{}".format(
boto.config.get("Boto", "proxy", None),
boto.config.getint("Boto", "proxy_port", 0),
url.hostname,
url.port,
)
if proxy != "":
os.environ["HTTP_PROXY"] = proxy
Expand Down

0 comments on commit 200b85c

Please sign in to comment.