Skip to content

Commit

Permalink
clean up datagun
Browse files Browse the repository at this point in the history
  • Loading branch information
cbini committed Sep 19, 2023
1 parent 1238145 commit 0f0e26f
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 251 deletions.
228 changes: 20 additions & 208 deletions src/teamster/core/datagun/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
import re

import pendulum
from dagster import AssetExecutionContext, AssetKey, asset, config_from_files
from dagster import AssetExecutionContext, AssetKey, asset
from dagster_gcp import BigQueryResource, GCSResource
from google.cloud import bigquery, storage
from pandas import DataFrame
from sqlalchemy import literal_column, select, table, text

from teamster.core.google.sheets.resources import GoogleSheetsResource
from teamster.core.sqlalchemy.resources import MSSQLResource
from teamster.core.ssh.resources import SSHConfigurableResource
from teamster.core.utils.classes import CustomJSONEncoder

Expand Down Expand Up @@ -120,70 +118,6 @@ def load_sftp(
f.write(data)


def build_sql_query_sftp_asset(
asset_name,
key_prefix,
query_config,
file_config,
destination_config,
timezone,
op_tags={},
):
now = pendulum.now(tz=timezone)

destination_name = destination_config["name"]
destination_path = destination_config.get("path", "")

file_suffix = file_config["suffix"]
file_stem = file_config["stem"]

file_stem_fmt = file_stem.format(
today=now.to_date_string(), now=str(now.timestamp()).replace(".", "_")
)

file_name = f"{file_stem_fmt}.{file_suffix}"
# asset_name = (
# re.sub(pattern="[^A-Za-z0-9_]", repl="", string=file_stem) + f"_{file_suffix}"
# )

@asset(
name=asset_name,
key_prefix=key_prefix,
metadata={**query_config, **file_config},
required_resource_keys={"db_mssql", f"ssh_{destination_config['name']}"},
op_tags=op_tags,
)
def sftp_extract(context):
sql = construct_query(
query_type=query_config["type"], query_value=query_config["value"], now=now
)

data = context.resources.db_mssql.engine.execute_query(
query=sql,
partition_size=query_config.get("partition_size", 100000),
output_format="dict",
)

if data:
context.log.info(f"Transforming data to {file_suffix}")
transformed_data = transform_data(
data=data,
file_suffix=file_suffix,
file_encoding=file_config.get("encoding", "utf-8"),
file_format=file_config.get("format", {}),
)

load_sftp(
context=context,
ssh=getattr(context.resources, f"ssh_{destination_name}"),
data=transformed_data,
file_name=file_name,
destination_path=destination_path,
)

return sftp_extract


def build_bigquery_query_sftp_asset(
code_location,
timezone,
Expand Down Expand Up @@ -231,23 +165,25 @@ def _asset(context: AssetExecutionContext):

data = [dict(row) for row in query_job.result()]

if data:
transformed_data = transform_data(
data=data,
file_suffix=file_suffix,
file_encoding=file_config.get("encoding", "utf-8"),
file_format=file_config.get("format", {}),
)
del data
gc.collect()

load_sftp(
context=context,
ssh=getattr(context.resources, f"ssh_{destination_name}"),
data=transformed_data,
file_name=file_name,
destination_path=destination_path,
)
if len(data) == 0:
context.log.warn("Query returned an empty result")

transformed_data = transform_data(
data=data,
file_suffix=file_suffix,
file_encoding=file_config.get("encoding", "utf-8"),
file_format=file_config.get("format", {}),
)
del data
gc.collect()

load_sftp(
context=context,
ssh=getattr(context.resources, f"ssh_{destination_name}"),
data=transformed_data,
file_name=file_name,
destination_path=destination_path,
)

return _asset

Expand Down Expand Up @@ -387,127 +323,3 @@ def _asset(
extract_job.result()

return _asset


def gsheet_extract_asset_factory(
asset_name, key_prefix, query_config, file_config, timezone, folder_id, op_tags={}
):
now = pendulum.now(tz=timezone)

@asset(name=asset_name, key_prefix=key_prefix, op_tags=op_tags)
def gsheet_extract(
context: AssetExecutionContext,
db_mssql: MSSQLResource,
gsheets: GoogleSheetsResource,
):
file_stem: str = file_config["stem"].format(
today=now.to_date_string(), now=str(now.timestamp()).replace(".", "_")
)

# gsheet title first character must be alpha
if file_stem[0].isnumeric():
file_stem = "GS" + file_stem

data = db_mssql.engine.execute_query(
query=construct_query(
query_type=query_config["type"],
query_value=query_config["value"],
now=now,
),
partition_size=query_config.get("partition_size", 100000),
output_format="dict",
)

if not data:
return None

context.log.info("Transforming data to gsheet")
transformed_data = transform_data(data=data, file_suffix="gsheet")

context.log.info(f"Opening: {file_stem}")
spreadsheet = gsheets.open_or_create_sheet(title=file_stem, folder_id=folder_id)

context.log.debug(spreadsheet.url)

named_range_match = [
nr for nr in spreadsheet.list_named_ranges() if nr["name"] == file_stem
]

if named_range_match:
named_range = named_range_match[0]["range"]
named_range_id = named_range_match[0]["namedRangeId"]

named_range_sheet_id = named_range.get("sheetId", 0)
end_row_ix = named_range.get("endRowIndex", 0)
end_col_ix = named_range.get("endColumnIndex", 0)

range_area = (end_row_ix + 1) * (end_col_ix + 1)
else:
named_range_id = None
range_area = 0

worksheet = (
spreadsheet.get_worksheet_by_id(id=named_range_sheet_id)
if named_range_id
else spreadsheet.sheet1
)

nrows, ncols = transformed_data["shape"]
nrows = nrows + 1 # header row
transformed_data_area = nrows * ncols

# resize worksheet
worksheet_area = worksheet.row_count * worksheet.col_count
if worksheet_area != transformed_data_area:
context.log.debug(f"Resizing worksheet area to {nrows}x{ncols}.")
worksheet.resize(rows=nrows, cols=ncols)

# resize named range
if range_area != transformed_data_area:
start_cell = gsheets.rowcol_to_a1(1, 1)
end_cell = gsheets.rowcol_to_a1(nrows, ncols)

context.log.debug(
f"Resizing named range '{file_stem}' to {start_cell}:{end_cell}."
)

worksheet.delete_named_range(named_range_id=named_range_id)
worksheet.define_named_range(
name=f"{start_cell}:{end_cell}", range_name=file_stem
)

context.log.debug(f"Clearing '{file_stem}' values.")
worksheet.batch_clear([file_stem])

context.log.info(f"Updating '{file_stem}': {transformed_data_area} cells.")
worksheet.update(
file_stem, [transformed_data["columns"]] + transformed_data["data"]
)

return gsheet_extract


def generate_extract_assets(code_location, name, extract_type, timezone):
cfg = config_from_files(
[f"src/teamster/{code_location}/datagun/config/{name}.yaml"]
)

assets = []
for ac in cfg["assets"]:
if extract_type == "sftp":
assets.append(
build_sql_query_sftp_asset(
key_prefix=cfg["key_prefix"], timezone=timezone, **ac
)
)
elif extract_type == "gsheet":
assets.append(
gsheet_extract_asset_factory(
key_prefix=cfg["key_prefix"],
folder_id=cfg["folder_id"],
timezone=timezone,
**ac,
)
)

return assets
19 changes: 8 additions & 11 deletions src/teamster/kippcamden/datagun/assets.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
from dagster import config_from_files

from teamster.core.datagun.assets import (
build_bigquery_extract_asset,
generate_extract_assets,
)
from teamster.core.datagun.assets import build_bigquery_extract_asset

from .. import CODE_LOCATION, LOCAL_TIMEZONE

config_dir = f"src/teamster/{CODE_LOCATION}/datagun/config"

cpn_extract_assets = generate_extract_assets(
code_location=CODE_LOCATION,
name="cpn",
extract_type="sftp",
timezone=LOCAL_TIMEZONE,
)
# cpn_extract_assets = generate_extract_assets(
# code_location=CODE_LOCATION,
# name="cpn",
# extract_type="sftp",
# timezone=LOCAL_TIMEZONE,
# )

powerschool_extract_assets = [
build_bigquery_extract_asset(
Expand All @@ -24,6 +21,6 @@
]

__all__ = [
*cpn_extract_assets,
# *cpn_extract_assets,
*powerschool_extract_assets,
]
32 changes: 0 additions & 32 deletions tests/sqlalchemy/test_resource_sqlalchemy.py

This file was deleted.

37 changes: 37 additions & 0 deletions tests/sqlalchemy/test_sqlalchemy_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import pendulum
from dagster import build_resources
from dagster_gcp import BigQueryResource
from google.cloud import bigquery

from teamster.core.datagun.assets import construct_query
from teamster.kipptaf import GCS_PROJECT_NAME


def _test(query_type, query_value):
with build_resources(
resources={"db_bigquery": BigQueryResource(project=GCS_PROJECT_NAME)}
) as resources:
db_bigquery: bigquery.Client = next(resources.db_bigquery)

query = construct_query(
now=pendulum.now(), query_type=query_type, query_value=query_value
)
print(query)

query_job = db_bigquery.query(query=str(query))

result = query_job.result()

data = [dict(row) for row in result]

print(data)


def test_schema_query():
table_schema = "kipptaf_extracts"
table_name = "rpt_deanslist__mod_assessment"

_test(
query_type="schema",
query_value={"table": {"name": table_name, "schema": table_schema}},
)

0 comments on commit 0f0e26f

Please sign in to comment.