From d34a42aa7678c50b488ff1810f3fbe38993806ef Mon Sep 17 00:00:00 2001 From: Kamesh Sampath Date: Wed, 26 Jun 2024 17:08:55 +0530 Subject: [PATCH 1/3] (refactor): simplify - remove unnecessary git secret setup - remove snowflake objects setup from git_config --- steps/03_git_config.sql | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/steps/03_git_config.sql b/steps/03_git_config.sql index 123f10a..1c5dbd7 100644 --- a/steps/03_git_config.sql +++ b/steps/03_git_config.sql @@ -17,10 +17,7 @@ GRANT OWNERSHIP ON SCHEMA PUBLIC TO ROLE GIT_ADMIN; USE ROLE GIT_ADMIN; USE DATABASE GIT_REPO; USE SCHEMA PUBLIC; -CREATE OR REPLACE SECRET GIT_SECRET - TYPE = PASSWORD - USERNAME = '') - ALLOWED_AUTHENTICATION_SECRETS = (GIT_SECRET) ENABLED = TRUE; CREATE OR REPLACE GIT REPOSITORY DE_QUICKSTART API_INTEGRATION = GIT_API_INTEGRATION - GIT_CREDENTIALS = GIT_SECRET ORIGIN = ''; SHOW GIT BRANCHES IN DE_QUICKSTART; ls @DE_QUICKSTART/branches/main; - -USE ROLE ACCOUNTADMIN; -SET MY_USER = CURRENT_USER(); -EXECUTE IMMEDIATE - FROM @GIT_REPO.PUBLIC.DE_QUICKSTART/branches/main/steps/03_setup_snowflake.sql - USING (MY_USER=>$MY_USER); - \ No newline at end of file From f8518fe72777c288a0c14f1b313d3fc530d83f1b Mon Sep 17 00:00:00 2001 From: Kamesh Sampath Date: Wed, 26 Jun 2024 17:26:51 +0530 Subject: [PATCH 2/3] (fix): Set the Database and Schema context - Set the right database context to use i.e `HOL_DB` - Set the right schema context to use `HOL_SCHEMA` --- app/05_raw_data.py | 65 +++++++++++++++++-------- app/06_load_daily_city_metrics.py | 79 ++++++++++++++++++++++--------- steps/07_deploy_task_dag.py | 26 +++++++--- 3 files changed, 122 insertions(+), 48 deletions(-) diff --git a/app/05_raw_data.py b/app/05_raw_data.py index c290816..0235881 100644 --- a/app/05_raw_data.py +++ b/app/05_raw_data.py @@ -1,70 +1,97 @@ -#------------------------------------------------------------------------------ +# ------------------------------------------------------------------------------ # Hands-On Lab: Data Engineering with Snowpark # Script: 02_load_raw.py # Author: Jeremiah Hansen, Caleb Baechtold # Last Updated: 1/9/2023 -#------------------------------------------------------------------------------ +# ------------------------------------------------------------------------------ import time from snowflake.snowpark import Session -POS_TABLES = ['country', 'franchise', 'location', 'menu', 'truck', 'order_header', 'order_detail'] -CUSTOMER_TABLES = ['customer_loyalty'] +POS_TABLES = [ + "country", + "franchise", + "location", + "menu", + "truck", + "order_header", + "order_detail", +] +CUSTOMER_TABLES = ["customer_loyalty"] TABLE_DICT = { "pos": {"schema": "RAW_POS", "tables": POS_TABLES}, - "customer": {"schema": "RAW_CUSTOMER", "tables": CUSTOMER_TABLES} + "customer": {"schema": "RAW_CUSTOMER", "tables": CUSTOMER_TABLES}, } # SNOWFLAKE ADVANTAGE: Schema detection # SNOWFLAKE ADVANTAGE: Data ingestion with COPY # SNOWFLAKE ADVANTAGE: Snowflake Tables (not file-based) + def load_raw_table(session, tname=None, s3dir=None, year=None, schema=None): session.use_schema(schema) if year is None: location = "@external.frostbyte_raw_stage/{}/{}".format(s3dir, tname) else: - print('\tLoading year {}'.format(year)) - location = "@external.frostbyte_raw_stage/{}/{}/year={}".format(s3dir, tname, year) - + print("\tLoading year {}".format(year)) + location = "@external.frostbyte_raw_stage/{}/{}/year={}".format( + s3dir, tname, year + ) + # we can infer schema using the parquet read option - df = session.read.option("compression", "snappy") \ - .parquet(location) + df = session.read.option("compression", "snappy").parquet(location) df.copy_into_table("{}".format(tname)) + # SNOWFLAKE ADVANTAGE: Warehouse elasticity (dynamic scaling) + def load_all_raw_tables(session): - _ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE").collect() + _ = session.sql( + "ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE" + ).collect() for s3dir, data in TABLE_DICT.items(): - tnames = data['tables'] - schema = data['schema'] + tnames = data["tables"] + schema = data["schema"] for tname in tnames: print("Loading {}".format(tname)) # Only load the first 3 years of data for the order tables at this point # We will load the 2022 data later in the lab - if tname in ['order_header', 'order_detail']: - for year in ['2019', '2020', '2021']: - load_raw_table(session, tname=tname, s3dir=s3dir, year=year, schema=schema) + if tname in ["order_header", "order_detail"]: + for year in ["2019", "2020", "2021"]: + load_raw_table( + session, tname=tname, s3dir=s3dir, year=year, schema=schema + ) else: load_raw_table(session, tname=tname, s3dir=s3dir, schema=schema) _ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL").collect() + def validate_raw_tables(session): # check column names from the inferred schema for tname in POS_TABLES: - print('{}: \n\t{}\n'.format(tname, session.table('RAW_POS.{}'.format(tname)).columns)) + print( + "{}: \n\t{}\n".format( + tname, session.table("RAW_POS.{}".format(tname)).columns + ) + ) for tname in CUSTOMER_TABLES: - print('{}: \n\t{}\n'.format(tname, session.table('RAW_CUSTOMER.{}'.format(tname)).columns)) + print( + "{}: \n\t{}\n".format( + tname, session.table("RAW_CUSTOMER.{}".format(tname)).columns + ) + ) # For local debugging if __name__ == "__main__": # Create a local Snowpark session with Session.builder.getOrCreate() as session: + # Set the right database context to use + session.use_database("HOL_DB") load_all_raw_tables(session) - validate_raw_tables(session) \ No newline at end of file + validate_raw_tables(session) diff --git a/app/06_load_daily_city_metrics.py b/app/06_load_daily_city_metrics.py index 457d4df..04e67aa 100644 --- a/app/06_load_daily_city_metrics.py +++ b/app/06_load_daily_city_metrics.py @@ -1,12 +1,20 @@ from snowflake.snowpark import Session import snowflake.snowpark.functions as F -def table_exists(session, schema='', name=''): - exists = session.sql("SELECT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}') AS TABLE_EXISTS".format(schema, name)).collect()[0]['TABLE_EXISTS'] + +def table_exists(session, schema="", name=""): + exists = session.sql( + "SELECT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}') AS TABLE_EXISTS".format( + schema, name + ) + ).collect()[0]["TABLE_EXISTS"] return exists + def main(session: Session) -> str: - _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE').collect() + _ = session.sql( + "ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE" + ).collect() schema_name = "HOL_SCHEMA" table_name = "DAILY_CITY_METRICS" @@ -17,41 +25,66 @@ def main(session: Session) -> str: location = session.table("RAW_POS.LOCATION") # Join the tables - orders = order_header.join(order_detail, order_header['ORDER_ID'] == order_detail['ORDER_ID']) - orders = orders.join(location, orders['LOCATION_ID'] == location['LOCATION_ID']) - order_detail = orders.join(history_day, (F.builtin("DATE")(order_header['ORDER_TS']) == history_day['DATE_VALID_STD']) & (orders['ISO_COUNTRY_CODE'] == history_day['COUNTRY']) & (orders['CITY'] == history_day['CITY_NAME'])) + orders = order_header.join( + order_detail, order_header["ORDER_ID"] == order_detail["ORDER_ID"] + ) + orders = orders.join(location, orders["LOCATION_ID"] == location["LOCATION_ID"]) + order_detail = orders.join( + history_day, + (F.builtin("DATE")(order_header["ORDER_TS"]) == history_day["DATE_VALID_STD"]) + & (orders["ISO_COUNTRY_CODE"] == history_day["COUNTRY"]) + & (orders["CITY"] == history_day["CITY_NAME"]), + ) # Aggregate the data - final_agg = order_detail.group_by(F.col('DATE_VALID_STD'), F.col('CITY_NAME'), F.col('ISO_COUNTRY_CODE')) \ - .agg( \ - F.sum('PRICE').alias('DAILY_SALES_SUM'), \ - F.avg('AVG_TEMPERATURE_AIR_2M_F').alias("AVG_TEMPERATURE_F"), \ - F.avg("TOT_PRECIPITATION_IN").alias("AVG_PRECIPITATION_IN"), \ - ) \ - .select(F.col("DATE_VALID_STD").alias("DATE"), F.col("CITY_NAME"), F.col("ISO_COUNTRY_CODE").alias("COUNTRY_DESC"), \ - F.builtin("ZEROIFNULL")(F.col("DAILY_SALES_SUM")).alias("DAILY_SALES"), \ - F.round(F.col("AVG_TEMPERATURE_F"), 2).alias("AVG_TEMPERATURE_FAHRENHEIT"), \ - F.round(F.col("AVG_PRECIPITATION_IN"), 2).alias("AVG_PRECIPITATION_INCHES"), \ - ) + final_agg = ( + order_detail.group_by( + F.col("DATE_VALID_STD"), F.col("CITY_NAME"), F.col("ISO_COUNTRY_CODE") + ) + .agg( + F.sum("PRICE").alias("DAILY_SALES_SUM"), + F.avg("AVG_TEMPERATURE_AIR_2M_F").alias("AVG_TEMPERATURE_F"), + F.avg("TOT_PRECIPITATION_IN").alias("AVG_PRECIPITATION_IN"), + ) + .select( + F.col("DATE_VALID_STD").alias("DATE"), + F.col("CITY_NAME"), + F.col("ISO_COUNTRY_CODE").alias("COUNTRY_DESC"), + F.builtin("ZEROIFNULL")(F.col("DAILY_SALES_SUM")).alias("DAILY_SALES"), + F.round(F.col("AVG_TEMPERATURE_F"), 2).alias("AVG_TEMPERATURE_FAHRENHEIT"), + F.round(F.col("AVG_PRECIPITATION_IN"), 2).alias("AVG_PRECIPITATION_INCHES"), + ) + ) session.use_schema(schema_name) # If the table doesn't exist then create it if not table_exists(session, schema=schema_name, name=table_name): final_agg.write.mode("overwrite").save_as_table(table_name) - _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL').collect() + _ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL").collect() return f"Successfully created {table_name}" # Otherwise update it else: cols_to_update = {c: final_agg[c] for c in final_agg.schema.names} dcm = session.table(table_name) - dcm.merge(final_agg, (dcm['DATE'] == final_agg['DATE']) & (dcm['CITY_NAME'] == final_agg['CITY_NAME']) & (dcm['COUNTRY_DESC'] == final_agg['COUNTRY_DESC']), \ - [F.when_matched().update(cols_to_update), F.when_not_matched().insert(cols_to_update)]) - _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL').collect() + dcm.merge( + final_agg, + (dcm["DATE"] == final_agg["DATE"]) + & (dcm["CITY_NAME"] == final_agg["CITY_NAME"]) + & (dcm["COUNTRY_DESC"] == final_agg["COUNTRY_DESC"]), + [ + F.when_matched().update(cols_to_update), + F.when_not_matched().insert(cols_to_update), + ], + ) + _ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL").collect() return f"Successfully updated {table_name}" - + + # For local debugging if __name__ == "__main__": # Create a local Snowpark session with Session.builder.getOrCreate() as session: - main(session) \ No newline at end of file + # Set the right database context to use + session.use_database("HOL_DB") + main(session) diff --git a/steps/07_deploy_task_dag.py b/steps/07_deploy_task_dag.py index 67742f0..3ce6e74 100644 --- a/steps/07_deploy_task_dag.py +++ b/steps/07_deploy_task_dag.py @@ -1,9 +1,9 @@ -#------------------------------------------------------------------------------ +# ------------------------------------------------------------------------------ # Hands-On Lab: Intro to Data Engineering with Snowpark Python # Script: 07_deploy_task_dag.py # Author: Jeremiah Hansen # Last Updated: 9/26/2023 -#------------------------------------------------------------------------------ +# ------------------------------------------------------------------------------ # SNOWFLAKE ADVANTAGE: Snowpark Python API # SNOWFLAKE ADVANTAGE: Snowpark Python Task DAG API @@ -16,12 +16,18 @@ from snowflake.core.task import StoredProcedureCall, Task from snowflake.core.task.dagv1 import DAGOperation, DAG, DAGTask + # Create the tasks using the DAG API def main(session: Session) -> str: database_name = "HOL_DB" schema_name = "HOL_SCHEMA" warehouse_name = "HOL_WH" + # set database context + session.use_database(database_name) + # set database schema context + session.use_schema(schema_name) + api_root = Root(session) schema = api_root.databases[database_name].schemas[schema_name] tasks = schema.tasks @@ -30,8 +36,16 @@ def main(session: Session) -> str: dag_name = "HOL_DAG" dag = DAG(dag_name, schedule=timedelta(days=1), warehouse=warehouse_name) with dag: - dag_task1 = DAGTask("LOAD_ORDER_DETAIL_TASK", definition="CALL LOAD_EXCEL_WORKSHEET_TO_TABLE_SP(BUILD_SCOPED_FILE_URL(@FROSTBYTE_RAW_STAGE, 'intro/order_detail.xlsx'), 'order_detail', 'ORDER_DETAIL')", warehouse=warehouse_name) - dag_task2 = DAGTask("LOAD_DAILY_CITY_METRICS_TASK", definition="CALL LOAD_DAILY_CITY_METRICS_SP()", warehouse=warehouse_name) + dag_task1 = DAGTask( + "LOAD_ORDER_DETAIL_TASK", + definition="CALL LOAD_EXCEL_WORKSHEET_TO_TABLE_SP(BUILD_SCOPED_FILE_URL(@EXTERNAL.FROSTBYTE_RAW_STAGE, 'intro/order_detail.xlsx'), 'order_detail', 'ORDER_DETAIL')", + warehouse=warehouse_name, + ) + dag_task2 = DAGTask( + "LOAD_DAILY_CITY_METRICS_TASK", + definition="CALL LOAD_DAILY_CITY_METRICS_SP()", + warehouse=warehouse_name, + ) dag_task2 >> dag_task1 @@ -39,7 +53,7 @@ def main(session: Session) -> str: dag_op = DAGOperation(schema) dag_op.deploy(dag, mode="orreplace") - dagiter = dag_op.iter_dags(like='hol_dag%') + dagiter = dag_op.iter_dags(like="hol_dag%") for dag_name in dagiter: print(dag_name) @@ -50,6 +64,6 @@ def main(session: Session) -> str: # For local debugging # Be aware you may need to type-convert arguments if you add input parameters -if __name__ == '__main__': +if __name__ == "__main__": with Session.builder.getOrCreate() as session: main(session) From 60ab3a4e60f9f191f58ae13bc4094d41b1d6b0d8 Mon Sep 17 00:00:00 2001 From: Kamesh Sampath Date: Wed, 26 Jun 2024 17:35:43 +0530 Subject: [PATCH 3/3] (chore): add snowflake package tasks api --- requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 344611f..45d61f9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -snowflake-snowpark-python \ No newline at end of file +snowflake-snowpark-python +snowflake \ No newline at end of file