Skip to content

Commit

Permalink
Merge pull request #3 from MikeMoore63/main
Browse files Browse the repository at this point in the history
chore(release): propose changes to fix routine syncing and make native isntall work better with python3
  • Loading branch information
mike-m-hsbc authored Apr 10, 2023
2 parents 7a1c61e + 2b08474 commit 6a950e6
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 6 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
1.0.5 April 10th 2023
- Adjust jinja dependency so will allow Jina2 version3
- Fix issue with routine copying failing
- Fix issue with partial rather than full dataset names
- Copy scalar functions prior to views which often reference these

1.0.4 December 6th 2022
- Loosen grpcio dependency to correct constraint

Expand Down
143 changes: 140 additions & 3 deletions bqtools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,18 @@
creation_time
"""

FUNCCOMPARE = """SELECT
routine_name,
routine_body,
routine_type,
data_type,
routine_definition
FROM
`{0}.{1}.INFORMATION_SCHEMA.ROUTINES`
WHERE routine_type = "FUNCTION"
ORDER BY
1"""

RTNCOMPARE = """SELECT
routine_name,
routine_body,
Expand All @@ -149,6 +161,7 @@
routine_definition
FROM
`{0}.{1}.INFORMATION_SCHEMA.ROUTINES`
WHERE routine_type != "FUNCTION"
ORDER BY
1"""

Expand Down Expand Up @@ -3117,6 +3130,10 @@ def update_source_view_definition(self, view_definition, use_standard_sql):
view_definition = view_definition.replace(
r'[{}.{}.'.format(self.source_project, self.source_dataset),
"[{}:{}.".format(self.destination_project, self.destination_dataset))
# support short names
view_definition = view_definition.replace(
r'{}.'.format(self.source_dataset),
"{}.".format(self.destination_dataset))

return view_definition

Expand Down Expand Up @@ -3765,6 +3782,10 @@ def update_source_view_definition(self, view_definition, use_standard_sql):
view_definition = view_definition.replace(
r'[{}.{}.'.format(src_proj, src_dataset),
"[{}:{}.".format(dst_proj, dst_dataset))
# this should not be required but seems it is
view_definition = view_definition.replace(
r'{}.'.format(src_dataset),
"{}.".format(dst_dataset))
return view_definition


Expand Down Expand Up @@ -3813,6 +3834,7 @@ def coordinater(self, value):
self.__coordinater = value

def real_update_source_view_definition(self, view_definition, use_standard_sql):
# long form
view_definition = view_definition.replace(
r'`{}.{}.'.format(self.source_project, self.source_dataset),
"`{}.{}.".format(self.destination_project, self.destination_dataset))
Expand All @@ -3823,6 +3845,10 @@ def real_update_source_view_definition(self, view_definition, use_standard_sql):
view_definition = view_definition.replace(
r'[{}.{}.'.format(self.source_project, self.source_dataset),
"[{}:{}.".format(self.destination_project, self.destination_dataset))
# short form
view_definition = view_definition.replace(
r'{}.'.format(self.source_dataset),
"{}.".format(self.destination_dataset))
return view_definition

def discovery_update_table(self, table_api_rep, logging):
Expand Down Expand Up @@ -5027,7 +5053,10 @@ def remove_deleted_destination_routine(copy_driver, routine_name):
dstroutine_ref = bigquery.Routine(
"{}.{}.{}".format(copy_driver.destination_project, copy_driver.destination_dataset,
routine_name))
copy_driver.destination_client.delete_routine(dstroutine_ref)
try:
copy_driver.destination_client.delete_routine(dstroutine_ref)
except exceptions.NotFound as e:
pass
else:
copy_driver.get_logger().warning(
"Unable to remove routine from source {}.{}.{} as Routine class not defined in "
Expand Down Expand Up @@ -5067,7 +5096,14 @@ def create_destination_routine(copy_driver, routine_name, routine_input):
dstroutine_ref.imported_libraries = srcroutine.imported_libraries
dstroutine_ref.language = srcroutine.language
dstroutine_ref.type_ = srcroutine.type_
dstroutine = copy_driver.destination_client.create_routine(dstroutine_ref)
try:
dstroutine = copy_driver.destination_client.create_routine(dstroutine_ref)

# normally as referencing a routine not created
# so we accept this on basis next sync will resolve
except exceptions.BadRequest as e:
copy_driver.get_logger().exception(f"Unable to create routine {routine_name} in {copy_driver.destination_project}.{copy_driver.destination_dataset} definition {routine_input['routine_definition']}")
return
return dstroutine
else:
copy_driver.get_logger().warning(
Expand Down Expand Up @@ -5384,8 +5420,8 @@ def sync_bq_datset(copy_driver, schema_threads=10, copy_data_threads=50):

wait_for_queue(schema_q, "Materialized view sychronization", 0.3, copy_driver.get_logger())

# scalar functions have to be copied in case in views
if "VIEW" in copy_driver.copy_types or "ROUTINE" in copy_driver.copy_types:

# Now do views
# views need applying in order
# we assume order created is the order
Expand All @@ -5406,6 +5442,107 @@ def sync_bq_datset(copy_driver, schema_threads=10, copy_data_threads=50):
query_cmek=copy_driver.query_cmek[1]):
view_or_routine_order.append(viewrow["table_name"])

# now list and compare views
source_routine_query = FUNCCOMPARE.format(copy_driver.source_project,
copy_driver.source_dataset)
destination_routine_query = FUNCCOMPARE.format(copy_driver.destination_project,
copy_driver.destination_dataset)
source_ended = False
destination_ended = False

source_generator = run_query(copy_driver.query_client, source_routine_query,
"List source functions", copy_driver.get_logger(),
location=copy_driver.source_location,
callback_on_complete=copy_driver.update_job_stats,
labels=BQSYNCQUERYLABELS,
query_cmek=copy_driver.query_cmek[1])
try:
source_row = next(source_generator)
except StopIteration:
source_ended = True

destination_generator = run_query(copy_driver.query_client, destination_routine_query,
copy_driver.get_logger(),
"List destination functions",
location=copy_driver.destination_location,
callback_on_complete=copy_driver.update_job_stats,
labels=BQSYNCQUERYLABELS,
query_cmek=copy_driver.query_cmek[0])
try:
destination_row = next(destination_generator)
except StopIteration:
destination_ended = True

while not source_ended or not destination_ended:
if not destination_ended and not source_ended and destination_row["routine_name"] \
== \
source_row["routine_name"]:
if copy_driver.istableincluded(source_row["routine_name"]):
copy_driver.increment_routines_synced()
expected_definition = copy_driver.update_source_view_definition(
source_row["routine_definition"], source_row["routine_type"])
if expected_definition != destination_row["routine_definition"]:
routines_to_apply[source_row["routine_name"]] = {
"routine_definition": expected_definition,
"routine_type": source_row["routine_type"],
"action": "patch_routine"}
else:
copy_driver.increment_routines_avoided()
try:
source_row = next(source_generator)
except StopIteration:
source_ended = True
try:
destination_row = next(destination_generator)
except StopIteration:
destination_ended = True
elif (destination_ended and not source_ended) or (
not source_ended and source_row["routine_name"] < destination_row[
"routine_name"]):
if copy_driver.istableincluded(source_row["routine_name"]):
copy_driver.increment_routines_synced()
expected_definition = copy_driver.update_source_view_definition(
source_row["routine_definition"], source_row["routine_type"])
routines_to_apply[source_row["routine_name"]] = {
"routine_definition": expected_definition,
"routine_type": source_row["routine_type"], "action": "create_routine"}
try:
source_row = next(source_generator)
except StopIteration:
source_ended = True
elif (source_ended and not destination_ended) or (
not destination_ended and source_row["routine_name"] > destination_row[
"routine_name"]):
remove_deleted_destination_routine(copy_driver, destination_row["routine_name"])
try:
destination_row = next(destination_generator)
except StopIteration:
destination_ended = True

for view in view_or_routine_order:
if view in routines_to_apply:
if routines_to_apply[view]["action"] == "create_routine":
create_destination_routine(copy_driver, view, routines_to_apply[view])
else:
patch_destination_routine(copy_driver, view, routines_to_apply[view])
if view in views_to_apply:
if views_to_apply[view]["action"] == "create_view":
create_destination_view(copy_driver, view, views_to_apply[view])
else:
patch_destination_view(copy_driver, view, views_to_apply[view])

wait_for_queue(schema_q, "View/Routine schema synchronization", 0.3,
copy_driver.get_logger())

if "VIEW" in copy_driver.copy_types or "ROUTINE" in copy_driver.copy_types:

# Now do views
# views need applying in order
# we assume order created is the order

views_to_apply = {}
routines_to_apply = {}

if "VIEW" in copy_driver.copy_types:
# now list and compare views
source_view_query = DSVIEWLISTQUERY.format(copy_driver.source_project,
Expand Down
2 changes: 1 addition & 1 deletion bqtools/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.0.4"
__version__ = "1.0.5"
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@
include_package_data=True,
scripts=['bqtools/bqsync'],
install_requires=[
"jinja2<3.0",
"jinja2>=2.0,<4.0",
"google-cloud<1.0",
"datetime<5.0",
"google-cloud-bigquery>=2.6.0,<3.0",
"google-cloud-storage>=1.0.0,<3.0",
"google-cloud-logging>=3.0,<4.0",
Expand Down

0 comments on commit 6a950e6

Please sign in to comment.