From 0f0e26f0286c39886f2a4b35f5f90979b66874c5 Mon Sep 17 00:00:00 2001 From: Charlie Bini <5003326+cbini@users.noreply.github.com> Date: Tue, 19 Sep 2023 17:17:34 +0000 Subject: [PATCH] clean up datagun --- src/teamster/core/datagun/assets.py | 228 ++----------------- src/teamster/kippcamden/datagun/assets.py | 19 +- tests/sqlalchemy/test_resource_sqlalchemy.py | 32 --- tests/sqlalchemy/test_sqlalchemy_resource.py | 37 +++ 4 files changed, 65 insertions(+), 251 deletions(-) delete mode 100644 tests/sqlalchemy/test_resource_sqlalchemy.py create mode 100644 tests/sqlalchemy/test_sqlalchemy_resource.py diff --git a/src/teamster/core/datagun/assets.py b/src/teamster/core/datagun/assets.py index 3325cf1ed..926c55071 100644 --- a/src/teamster/core/datagun/assets.py +++ b/src/teamster/core/datagun/assets.py @@ -5,14 +5,12 @@ import re import pendulum -from dagster import AssetExecutionContext, AssetKey, asset, config_from_files +from dagster import AssetExecutionContext, AssetKey, asset from dagster_gcp import BigQueryResource, GCSResource from google.cloud import bigquery, storage from pandas import DataFrame from sqlalchemy import literal_column, select, table, text -from teamster.core.google.sheets.resources import GoogleSheetsResource -from teamster.core.sqlalchemy.resources import MSSQLResource from teamster.core.ssh.resources import SSHConfigurableResource from teamster.core.utils.classes import CustomJSONEncoder @@ -120,70 +118,6 @@ def load_sftp( f.write(data) -def build_sql_query_sftp_asset( - asset_name, - key_prefix, - query_config, - file_config, - destination_config, - timezone, - op_tags={}, -): - now = pendulum.now(tz=timezone) - - destination_name = destination_config["name"] - destination_path = destination_config.get("path", "") - - file_suffix = file_config["suffix"] - file_stem = file_config["stem"] - - file_stem_fmt = file_stem.format( - today=now.to_date_string(), now=str(now.timestamp()).replace(".", "_") - ) - - file_name = f"{file_stem_fmt}.{file_suffix}" - # asset_name = ( - # re.sub(pattern="[^A-Za-z0-9_]", repl="", string=file_stem) + f"_{file_suffix}" - # ) - - @asset( - name=asset_name, - key_prefix=key_prefix, - metadata={**query_config, **file_config}, - required_resource_keys={"db_mssql", f"ssh_{destination_config['name']}"}, - op_tags=op_tags, - ) - def sftp_extract(context): - sql = construct_query( - query_type=query_config["type"], query_value=query_config["value"], now=now - ) - - data = context.resources.db_mssql.engine.execute_query( - query=sql, - partition_size=query_config.get("partition_size", 100000), - output_format="dict", - ) - - if data: - context.log.info(f"Transforming data to {file_suffix}") - transformed_data = transform_data( - data=data, - file_suffix=file_suffix, - file_encoding=file_config.get("encoding", "utf-8"), - file_format=file_config.get("format", {}), - ) - - load_sftp( - context=context, - ssh=getattr(context.resources, f"ssh_{destination_name}"), - data=transformed_data, - file_name=file_name, - destination_path=destination_path, - ) - - return sftp_extract - - def build_bigquery_query_sftp_asset( code_location, timezone, @@ -231,23 +165,25 @@ def _asset(context: AssetExecutionContext): data = [dict(row) for row in query_job.result()] - if data: - transformed_data = transform_data( - data=data, - file_suffix=file_suffix, - file_encoding=file_config.get("encoding", "utf-8"), - file_format=file_config.get("format", {}), - ) - del data - gc.collect() - - load_sftp( - context=context, - ssh=getattr(context.resources, f"ssh_{destination_name}"), - data=transformed_data, - file_name=file_name, - destination_path=destination_path, - ) + if len(data) == 0: + context.log.warn("Query returned an empty result") + + transformed_data = transform_data( + data=data, + file_suffix=file_suffix, + file_encoding=file_config.get("encoding", "utf-8"), + file_format=file_config.get("format", {}), + ) + del data + gc.collect() + + load_sftp( + context=context, + ssh=getattr(context.resources, f"ssh_{destination_name}"), + data=transformed_data, + file_name=file_name, + destination_path=destination_path, + ) return _asset @@ -387,127 +323,3 @@ def _asset( extract_job.result() return _asset - - -def gsheet_extract_asset_factory( - asset_name, key_prefix, query_config, file_config, timezone, folder_id, op_tags={} -): - now = pendulum.now(tz=timezone) - - @asset(name=asset_name, key_prefix=key_prefix, op_tags=op_tags) - def gsheet_extract( - context: AssetExecutionContext, - db_mssql: MSSQLResource, - gsheets: GoogleSheetsResource, - ): - file_stem: str = file_config["stem"].format( - today=now.to_date_string(), now=str(now.timestamp()).replace(".", "_") - ) - - # gsheet title first character must be alpha - if file_stem[0].isnumeric(): - file_stem = "GS" + file_stem - - data = db_mssql.engine.execute_query( - query=construct_query( - query_type=query_config["type"], - query_value=query_config["value"], - now=now, - ), - partition_size=query_config.get("partition_size", 100000), - output_format="dict", - ) - - if not data: - return None - - context.log.info("Transforming data to gsheet") - transformed_data = transform_data(data=data, file_suffix="gsheet") - - context.log.info(f"Opening: {file_stem}") - spreadsheet = gsheets.open_or_create_sheet(title=file_stem, folder_id=folder_id) - - context.log.debug(spreadsheet.url) - - named_range_match = [ - nr for nr in spreadsheet.list_named_ranges() if nr["name"] == file_stem - ] - - if named_range_match: - named_range = named_range_match[0]["range"] - named_range_id = named_range_match[0]["namedRangeId"] - - named_range_sheet_id = named_range.get("sheetId", 0) - end_row_ix = named_range.get("endRowIndex", 0) - end_col_ix = named_range.get("endColumnIndex", 0) - - range_area = (end_row_ix + 1) * (end_col_ix + 1) - else: - named_range_id = None - range_area = 0 - - worksheet = ( - spreadsheet.get_worksheet_by_id(id=named_range_sheet_id) - if named_range_id - else spreadsheet.sheet1 - ) - - nrows, ncols = transformed_data["shape"] - nrows = nrows + 1 # header row - transformed_data_area = nrows * ncols - - # resize worksheet - worksheet_area = worksheet.row_count * worksheet.col_count - if worksheet_area != transformed_data_area: - context.log.debug(f"Resizing worksheet area to {nrows}x{ncols}.") - worksheet.resize(rows=nrows, cols=ncols) - - # resize named range - if range_area != transformed_data_area: - start_cell = gsheets.rowcol_to_a1(1, 1) - end_cell = gsheets.rowcol_to_a1(nrows, ncols) - - context.log.debug( - f"Resizing named range '{file_stem}' to {start_cell}:{end_cell}." - ) - - worksheet.delete_named_range(named_range_id=named_range_id) - worksheet.define_named_range( - name=f"{start_cell}:{end_cell}", range_name=file_stem - ) - - context.log.debug(f"Clearing '{file_stem}' values.") - worksheet.batch_clear([file_stem]) - - context.log.info(f"Updating '{file_stem}': {transformed_data_area} cells.") - worksheet.update( - file_stem, [transformed_data["columns"]] + transformed_data["data"] - ) - - return gsheet_extract - - -def generate_extract_assets(code_location, name, extract_type, timezone): - cfg = config_from_files( - [f"src/teamster/{code_location}/datagun/config/{name}.yaml"] - ) - - assets = [] - for ac in cfg["assets"]: - if extract_type == "sftp": - assets.append( - build_sql_query_sftp_asset( - key_prefix=cfg["key_prefix"], timezone=timezone, **ac - ) - ) - elif extract_type == "gsheet": - assets.append( - gsheet_extract_asset_factory( - key_prefix=cfg["key_prefix"], - folder_id=cfg["folder_id"], - timezone=timezone, - **ac, - ) - ) - - return assets diff --git a/src/teamster/kippcamden/datagun/assets.py b/src/teamster/kippcamden/datagun/assets.py index d7ce3b226..0522a3e2c 100644 --- a/src/teamster/kippcamden/datagun/assets.py +++ b/src/teamster/kippcamden/datagun/assets.py @@ -1,20 +1,17 @@ from dagster import config_from_files -from teamster.core.datagun.assets import ( - build_bigquery_extract_asset, - generate_extract_assets, -) +from teamster.core.datagun.assets import build_bigquery_extract_asset from .. import CODE_LOCATION, LOCAL_TIMEZONE config_dir = f"src/teamster/{CODE_LOCATION}/datagun/config" -cpn_extract_assets = generate_extract_assets( - code_location=CODE_LOCATION, - name="cpn", - extract_type="sftp", - timezone=LOCAL_TIMEZONE, -) +# cpn_extract_assets = generate_extract_assets( +# code_location=CODE_LOCATION, +# name="cpn", +# extract_type="sftp", +# timezone=LOCAL_TIMEZONE, +# ) powerschool_extract_assets = [ build_bigquery_extract_asset( @@ -24,6 +21,6 @@ ] __all__ = [ - *cpn_extract_assets, + # *cpn_extract_assets, *powerschool_extract_assets, ] diff --git a/tests/sqlalchemy/test_resource_sqlalchemy.py b/tests/sqlalchemy/test_resource_sqlalchemy.py deleted file mode 100644 index 2440f635c..000000000 --- a/tests/sqlalchemy/test_resource_sqlalchemy.py +++ /dev/null @@ -1,32 +0,0 @@ -import pendulum -from dagster import build_resources -from dagster_gcp import BigQueryResource -from google.cloud import bigquery - -from teamster.core.datagun.assets import construct_query -from teamster.kipptaf import GCS_PROJECT_NAME - -QUERY = { - "query_type": "schema", - "query_value": { - "table": {"name": "rpt_idauto__staff_roster", "schema": "kipptaf_extracts"}, - }, -} - - -def test_resource(): - with build_resources( - resources={"db_bigquery": BigQueryResource(project=GCS_PROJECT_NAME)} - ) as resources: - db_bigquery: bigquery.Client = next(resources.db_bigquery) - - query = construct_query(now=pendulum.now(), **QUERY) - print(query) - - query_job = db_bigquery.query(query=str(query)) - - result = query_job.result() - - data = [dict(row) for row in result] - - print(data) diff --git a/tests/sqlalchemy/test_sqlalchemy_resource.py b/tests/sqlalchemy/test_sqlalchemy_resource.py new file mode 100644 index 000000000..631c72b45 --- /dev/null +++ b/tests/sqlalchemy/test_sqlalchemy_resource.py @@ -0,0 +1,37 @@ +import pendulum +from dagster import build_resources +from dagster_gcp import BigQueryResource +from google.cloud import bigquery + +from teamster.core.datagun.assets import construct_query +from teamster.kipptaf import GCS_PROJECT_NAME + + +def _test(query_type, query_value): + with build_resources( + resources={"db_bigquery": BigQueryResource(project=GCS_PROJECT_NAME)} + ) as resources: + db_bigquery: bigquery.Client = next(resources.db_bigquery) + + query = construct_query( + now=pendulum.now(), query_type=query_type, query_value=query_value + ) + print(query) + + query_job = db_bigquery.query(query=str(query)) + + result = query_job.result() + + data = [dict(row) for row in result] + + print(data) + + +def test_schema_query(): + table_schema = "kipptaf_extracts" + table_name = "rpt_deanslist__mod_assessment" + + _test( + query_type="schema", + query_value={"table": {"name": table_name, "schema": table_schema}}, + )