From fca8d1d550f35264ce0b4e33118061c23d994316 Mon Sep 17 00:00:00 2001 From: Mike Moore Date: Mon, 10 Apr 2023 20:43:20 +0100 Subject: [PATCH 1/3] fix(dependencies): eliminate datetime version as part of python, loosend top end jinja version --- setup.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/setup.py b/setup.py index ac24358..54f860c 100644 --- a/setup.py +++ b/setup.py @@ -40,9 +40,8 @@ include_package_data=True, scripts=['bqtools/bqsync'], install_requires=[ - "jinja2<3.0", + "jinja2>=2.0,<4.0", "google-cloud<1.0", - "datetime<5.0", "google-cloud-bigquery>=2.6.0,<3.0", "google-cloud-storage>=1.0.0,<3.0", "google-cloud-logging>=3.0,<4.0", From 8a76366906cb4033159b7b21aca04235b071c205 Mon Sep 17 00:00:00 2001 From: Mike Moore Date: Mon, 10 Apr 2023 20:44:08 +0100 Subject: [PATCH 2/3] fix(bqsync): handle errors in routines more gracefully, copy scalar functions before views --- bqtools/__init__.py | 143 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 140 insertions(+), 3 deletions(-) diff --git a/bqtools/__init__.py b/bqtools/__init__.py index eda192c..db237c5 100644 --- a/bqtools/__init__.py +++ b/bqtools/__init__.py @@ -141,6 +141,18 @@ creation_time """ +FUNCCOMPARE = """SELECT + routine_name, + routine_body, + routine_type, + data_type, + routine_definition +FROM + `{0}.{1}.INFORMATION_SCHEMA.ROUTINES` +WHERE routine_type = "FUNCTION" +ORDER BY + 1""" + RTNCOMPARE = """SELECT routine_name, routine_body, @@ -149,6 +161,7 @@ routine_definition FROM `{0}.{1}.INFORMATION_SCHEMA.ROUTINES` +WHERE routine_type != "FUNCTION" ORDER BY 1""" @@ -3117,6 +3130,10 @@ def update_source_view_definition(self, view_definition, use_standard_sql): view_definition = view_definition.replace( r'[{}.{}.'.format(self.source_project, self.source_dataset), "[{}:{}.".format(self.destination_project, self.destination_dataset)) + # support short names + view_definition = view_definition.replace( + r'{}.'.format(self.source_dataset), + "{}.".format(self.destination_dataset)) return view_definition @@ -3765,6 +3782,10 @@ def update_source_view_definition(self, view_definition, use_standard_sql): view_definition = view_definition.replace( r'[{}.{}.'.format(src_proj, src_dataset), "[{}:{}.".format(dst_proj, dst_dataset)) + # this should not be required but seems it is + view_definition = view_definition.replace( + r'{}.'.format(src_dataset), + "{}.".format(dst_dataset)) return view_definition @@ -3813,6 +3834,7 @@ def coordinater(self, value): self.__coordinater = value def real_update_source_view_definition(self, view_definition, use_standard_sql): + # long form view_definition = view_definition.replace( r'`{}.{}.'.format(self.source_project, self.source_dataset), "`{}.{}.".format(self.destination_project, self.destination_dataset)) @@ -3823,6 +3845,10 @@ def real_update_source_view_definition(self, view_definition, use_standard_sql): view_definition = view_definition.replace( r'[{}.{}.'.format(self.source_project, self.source_dataset), "[{}:{}.".format(self.destination_project, self.destination_dataset)) + # short form + view_definition = view_definition.replace( + r'{}.'.format(self.source_dataset), + "{}.".format(self.destination_dataset)) return view_definition def discovery_update_table(self, table_api_rep, logging): @@ -5027,7 +5053,10 @@ def remove_deleted_destination_routine(copy_driver, routine_name): dstroutine_ref = bigquery.Routine( "{}.{}.{}".format(copy_driver.destination_project, copy_driver.destination_dataset, routine_name)) - copy_driver.destination_client.delete_routine(dstroutine_ref) + try: + copy_driver.destination_client.delete_routine(dstroutine_ref) + except exceptions.NotFound as e: + pass else: copy_driver.get_logger().warning( "Unable to remove routine from source {}.{}.{} as Routine class not defined in " @@ -5067,7 +5096,14 @@ def create_destination_routine(copy_driver, routine_name, routine_input): dstroutine_ref.imported_libraries = srcroutine.imported_libraries dstroutine_ref.language = srcroutine.language dstroutine_ref.type_ = srcroutine.type_ - dstroutine = copy_driver.destination_client.create_routine(dstroutine_ref) + try: + dstroutine = copy_driver.destination_client.create_routine(dstroutine_ref) + + # normally as referencing a routine not created + # so we accept this on basis next sync will resolve + except exceptions.BadRequest as e: + copy_driver.get_logger().exception(f"Unable to create routine {routine_name} in {copy_driver.destination_project}.{copy_driver.destination_dataset} definition {routine_input['routine_definition']}") + return return dstroutine else: copy_driver.get_logger().warning( @@ -5384,8 +5420,8 @@ def sync_bq_datset(copy_driver, schema_threads=10, copy_data_threads=50): wait_for_queue(schema_q, "Materialized view sychronization", 0.3, copy_driver.get_logger()) + # scalar functions have to be copied in case in views if "VIEW" in copy_driver.copy_types or "ROUTINE" in copy_driver.copy_types: - # Now do views # views need applying in order # we assume order created is the order @@ -5406,6 +5442,107 @@ def sync_bq_datset(copy_driver, schema_threads=10, copy_data_threads=50): query_cmek=copy_driver.query_cmek[1]): view_or_routine_order.append(viewrow["table_name"]) + # now list and compare views + source_routine_query = FUNCCOMPARE.format(copy_driver.source_project, + copy_driver.source_dataset) + destination_routine_query = FUNCCOMPARE.format(copy_driver.destination_project, + copy_driver.destination_dataset) + source_ended = False + destination_ended = False + + source_generator = run_query(copy_driver.query_client, source_routine_query, + "List source functions", copy_driver.get_logger(), + location=copy_driver.source_location, + callback_on_complete=copy_driver.update_job_stats, + labels=BQSYNCQUERYLABELS, + query_cmek=copy_driver.query_cmek[1]) + try: + source_row = next(source_generator) + except StopIteration: + source_ended = True + + destination_generator = run_query(copy_driver.query_client, destination_routine_query, + copy_driver.get_logger(), + "List destination functions", + location=copy_driver.destination_location, + callback_on_complete=copy_driver.update_job_stats, + labels=BQSYNCQUERYLABELS, + query_cmek=copy_driver.query_cmek[0]) + try: + destination_row = next(destination_generator) + except StopIteration: + destination_ended = True + + while not source_ended or not destination_ended: + if not destination_ended and not source_ended and destination_row["routine_name"] \ + == \ + source_row["routine_name"]: + if copy_driver.istableincluded(source_row["routine_name"]): + copy_driver.increment_routines_synced() + expected_definition = copy_driver.update_source_view_definition( + source_row["routine_definition"], source_row["routine_type"]) + if expected_definition != destination_row["routine_definition"]: + routines_to_apply[source_row["routine_name"]] = { + "routine_definition": expected_definition, + "routine_type": source_row["routine_type"], + "action": "patch_routine"} + else: + copy_driver.increment_routines_avoided() + try: + source_row = next(source_generator) + except StopIteration: + source_ended = True + try: + destination_row = next(destination_generator) + except StopIteration: + destination_ended = True + elif (destination_ended and not source_ended) or ( + not source_ended and source_row["routine_name"] < destination_row[ + "routine_name"]): + if copy_driver.istableincluded(source_row["routine_name"]): + copy_driver.increment_routines_synced() + expected_definition = copy_driver.update_source_view_definition( + source_row["routine_definition"], source_row["routine_type"]) + routines_to_apply[source_row["routine_name"]] = { + "routine_definition": expected_definition, + "routine_type": source_row["routine_type"], "action": "create_routine"} + try: + source_row = next(source_generator) + except StopIteration: + source_ended = True + elif (source_ended and not destination_ended) or ( + not destination_ended and source_row["routine_name"] > destination_row[ + "routine_name"]): + remove_deleted_destination_routine(copy_driver, destination_row["routine_name"]) + try: + destination_row = next(destination_generator) + except StopIteration: + destination_ended = True + + for view in view_or_routine_order: + if view in routines_to_apply: + if routines_to_apply[view]["action"] == "create_routine": + create_destination_routine(copy_driver, view, routines_to_apply[view]) + else: + patch_destination_routine(copy_driver, view, routines_to_apply[view]) + if view in views_to_apply: + if views_to_apply[view]["action"] == "create_view": + create_destination_view(copy_driver, view, views_to_apply[view]) + else: + patch_destination_view(copy_driver, view, views_to_apply[view]) + + wait_for_queue(schema_q, "View/Routine schema synchronization", 0.3, + copy_driver.get_logger()) + + if "VIEW" in copy_driver.copy_types or "ROUTINE" in copy_driver.copy_types: + + # Now do views + # views need applying in order + # we assume order created is the order + + views_to_apply = {} + routines_to_apply = {} + if "VIEW" in copy_driver.copy_types: # now list and compare views source_view_query = DSVIEWLISTQUERY.format(copy_driver.source_project, From 2b08474eb9c888933b0a2e70b2a69f52d5eda6f4 Mon Sep 17 00:00:00 2001 From: Mike Moore Date: Mon, 10 Apr 2023 20:45:06 +0100 Subject: [PATCH 3/3] chore(release): update version for release and changelog --- CHANGELOG.md | 6 ++++++ bqtools/_version.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cb24094..41291e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +1.0.5 April 10th 2023 +- Adjust jinja dependency so will allow Jina2 version3 +- Fix issue with routine copying failing +- Fix issue with partial rather than full dataset names +- Copy scalar functions prior to views which often reference these + 1.0.4 December 6th 2022 - Loosen grpcio dependency to correct constraint diff --git a/bqtools/_version.py b/bqtools/_version.py index 87a7cf5..222a4c1 100644 --- a/bqtools/_version.py +++ b/bqtools/_version.py @@ -1 +1 @@ -__version__ = "1.0.4" \ No newline at end of file +__version__ = "1.0.5" \ No newline at end of file