diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile new file mode 100644 index 0000000..f5daa2c --- /dev/null +++ b/.devcontainer/Dockerfile @@ -0,0 +1,19 @@ +FROM mcr.microsoft.com/devcontainers/miniconda:0-3 + +USER vscode +WORKDIR /home/vscode + +# Configure SnowSQL +RUN mkdir .snowsql +COPY .devcontainer/config .snowsql + +# Install SnowSQL +RUN curl -O https://sfc-repo.snowflakecomputing.com/snowsql/bootstrap/1.2/linux_x86_64/snowsql-1.2.28-linux_x86_64.bash \ + && SNOWSQL_DEST=~/bin SNOWSQL_LOGIN_SHELL=~/.profile bash snowsql-1.2.28-linux_x86_64.bash \ + && rm snowsql-1.2.28-linux_x86_64.bash + +# Create the conda environment +COPY environment.yml . +RUN conda env create \ + && conda init \ + && rm environment.yml \ No newline at end of file diff --git a/.devcontainer/config b/.devcontainer/config new file mode 100644 index 0000000..0248c69 --- /dev/null +++ b/.devcontainer/config @@ -0,0 +1,9 @@ +# SnowSQL config + +[connections.dev] +accountname = myaccount +username = myusername +password = mypassword +rolename = HOL_ROLE +warehousename = HOL_WH +dbname = HOL_DB \ No newline at end of file diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..0b65f30 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,31 @@ +// For format details, see https://aka.ms/devcontainer.json. For config options, see the +// README at: https://github.com/devcontainers/templates/tree/main/src/miniconda +{ + "name": "Snowflake Demo Codespace", + "build": { + "context": "..", + "dockerfile": "Dockerfile" + }, + + // Features to add to the dev container. More info: https://containers.dev/features. + // "features": {}, + + // Use 'forwardPorts' to make a list of ports inside the container available locally. + // "forwardPorts": [] + + // Use 'postCreateCommand' to run commands after the container is created. + // "postCreateCommand": "conda init", + + // Configure tool-specific properties. + "customizations": { + "vscode": { + "settings": { + "python.defaultInterpreterPath": "/opt/conda/envs/snowflake-demo", + "python.terminal.activateEnvInCurrentTerminal": true + }, + "extensions": [ + "snowflake.snowflake-vsc" + ] + } + } + } \ No newline at end of file diff --git a/.github/workflows/build_and_deploy.yaml b/.github/workflows/build_and_deploy.yaml new file mode 100644 index 0000000..f4d5478 --- /dev/null +++ b/.github/workflows/build_and_deploy.yaml @@ -0,0 +1,48 @@ +name: Deploy Snowpark Apps + +# Controls when the action will run. +on: + push: + branches: + - main + + # Allows you to run this workflow manually from the Actions tab + workflow_dispatch: + +jobs: + deploy: + runs-on: ubuntu-latest + + steps: + # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Setup Python 3.10 + uses: actions/setup-python@v4 + with: + python-version: '3.10' + + - name: Install Python packages + run: pip install -r requirements.txt + + - name: Configure snowcli + env: + SNOWSQL_ACCOUNT: ${{ secrets.SNOWSQL_ACCOUNT }} + SNOWSQL_USER: ${{ secrets.SNOWSQL_USER }} + SNOWSQL_PWD: ${{ secrets.SNOWSQL_PWD }} + SNOWSQL_ROLE: ${{ secrets.SNOWSQL_ROLE }} + SNOWSQL_WAREHOUSE: ${{ secrets.SNOWSQL_WAREHOUSE }} + SNOWSQL_DATABASE: ${{ secrets.SNOWSQL_DATABASE }} + run: | + cd $GITHUB_WORKSPACE + echo "[connections.dev]" > config + echo "accountname = $SNOWSQL_ACCOUNT" >> config + echo "username = $SNOWSQL_USER" >> config + echo "password = $SNOWSQL_PWD" >> config + echo "rolename = $SNOWSQL_ROLE" >> config + echo "warehousename = $SNOWSQL_WAREHOUSE" >> config + echo "dbname = $SNOWSQL_DATABASE" >> config + + - name: Deploy Snowpark apps + run: python deploy_snowpark_apps.py $GITHUB_WORKSPACE diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..aa652da --- /dev/null +++ b/.gitignore @@ -0,0 +1,135 @@ +# Mac files +.DS_Store + +# Snowpark specific files +creds.json + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ diff --git a/LEGAL.md b/LEGAL.md new file mode 100644 index 0000000..e52a5f1 --- /dev/null +++ b/LEGAL.md @@ -0,0 +1 @@ +This application is not part of the Snowflake Service and is governed by the terms in LICENSE, unless expressly agreed to in writing. You use this application at your own risk, and Snowflake has no obligation to support your use of this application. diff --git a/README.md b/README.md index 53d7b06..3a45a35 100644 --- a/README.md +++ b/README.md @@ -1 +1,9 @@ -# sfguide-data-engineering-with-snowpark-python-intro \ No newline at end of file +# Intro to Data Engineering with Snowpark Python +This repository contains the code for the *Intro to Data Engineering with Snowpark Python* Snowflake Quickstart. + +### ➡️ For overview, prerequisites, and to learn more, complete this end-to-end tutorial [Intro to Data Engineering with Snowpark Python](https://quickstarts.snowflake.com/) on quickstarts.snowflake.com. + +___ +Here is an overview of what we'll build in this lab: + + diff --git a/data/location.xlsx b/data/location.xlsx new file mode 100644 index 0000000..221f2ca Binary files /dev/null and b/data/location.xlsx differ diff --git a/data/order_detail.xlsx b/data/order_detail.xlsx new file mode 100644 index 0000000..26c7638 Binary files /dev/null and b/data/order_detail.xlsx differ diff --git a/deploy_snowpark_apps.py b/deploy_snowpark_apps.py new file mode 100644 index 0000000..14ca25c --- /dev/null +++ b/deploy_snowpark_apps.py @@ -0,0 +1,42 @@ +import sys; +import os; + +ignore_folders = ['__pycache__', '.ipynb_checkpoints'] + +if len(sys.argv) != 2: + print("Root directory is required") + exit() + +root_directory = sys.argv[1] +print(f"Deploying all Snowpark apps in root directory {root_directory}") + +# Walk the entire directory structure recursively +for (directory_path, directory_names, file_names) in os.walk(root_directory): + # Get just the last/final folder name in the directory path + base_name = os.path.basename(directory_path) + + # Skip any folders we want to ignore + if base_name in ignore_folders: +# print(f"Skipping ignored folder {directory_path}") + continue + + # An app.toml file in the folder is our indication that this folder contains + # a snowcli Snowpark App + if not "app.toml" in file_names: +# print(f"Skipping non-app folder {directory_path}") + continue + + # Next determine what type of app it is + app_type = "unknown" + if "local_connection.py" in file_names: + app_type = "procedure" + else: + app_type = "function" + + # Finally deploy the app with the snowcli tool + print(f"Found {app_type} app in folder {directory_path}") + print(f"Calling snowcli to deploy the {app_type} app") + os.chdir(f"{directory_path}") + # snow login will update the app.toml file with the correct path to the snowsql config file + os.system(f"snow login -c {root_directory}/config -C dev") + os.system(f"snow {app_type} create") diff --git a/environment.yml b/environment.yml new file mode 100644 index 0000000..62f40db --- /dev/null +++ b/environment.yml @@ -0,0 +1,11 @@ +name: snowflake-demo +channels: + - https://repo.anaconda.com/pkgs/snowflake + - nodefaults +dependencies: + - python=3.10 + - snowflake-snowpark-python + - pip + - pip: + # Snowflake + - snowflake-cli-labs==0.2.9 diff --git a/images/demo_overview.png b/images/demo_overview.png new file mode 100644 index 0000000..2e2f47c Binary files /dev/null and b/images/demo_overview.png differ diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..7364d82 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +snowflake-snowpark-python[pandas] +snowflake-cli-labs==0.2.9 diff --git a/steps/01_overview.md b/steps/01_overview.md new file mode 100644 index 0000000..b7f16c8 --- /dev/null +++ b/steps/01_overview.md @@ -0,0 +1 @@ +Please read and follow the instructions in step 1 in the [Intro to Data Engineering with Snowpark Python](https://quickstarts.snowflake.com/) diff --git a/steps/02_setup_quickstart.md b/steps/02_setup_quickstart.md new file mode 100644 index 0000000..28a7a69 --- /dev/null +++ b/steps/02_setup_quickstart.md @@ -0,0 +1 @@ +Please read and follow the instructions in step 2 in the [Intro to Data Engineering with Snowpark Python](https://quickstarts.snowflake.com/) diff --git a/steps/03_setup_snowflake.sql b/steps/03_setup_snowflake.sql new file mode 100644 index 0000000..4df60d5 --- /dev/null +++ b/steps/03_setup_snowflake.sql @@ -0,0 +1,54 @@ +/*----------------------------------------------------------------------------- +Hands-On Lab: Intro to Data Engineering with Snowpark Python +Script: 03_setup_snowflake.sql +Author: Jeremiah Hansen +Last Updated: 9/26/2023 +-----------------------------------------------------------------------------*/ + + +-- ---------------------------------------------------------------------------- +-- Step #1: Accept Anaconda Terms & Conditions +-- ---------------------------------------------------------------------------- + +-- See Getting Started section in Third-Party Packages (https://docs.snowflake.com/en/developer-guide/udf/python/udf-python-packages.html#getting-started) + + +-- ---------------------------------------------------------------------------- +-- Step #2: Create the account level objects (ACCOUNTADMIN part) +-- ---------------------------------------------------------------------------- +USE ROLE ACCOUNTADMIN; + +-- Roles +SET MY_USER = CURRENT_USER(); +CREATE OR REPLACE ROLE HOL_ROLE; +GRANT ROLE HOL_ROLE TO ROLE SYSADMIN; +GRANT ROLE HOL_ROLE TO USER IDENTIFIER($MY_USER); + +GRANT EXECUTE TASK ON ACCOUNT TO ROLE HOL_ROLE; +GRANT MONITOR EXECUTION ON ACCOUNT TO ROLE HOL_ROLE; +GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE HOL_ROLE; + +-- Databases +CREATE OR REPLACE DATABASE HOL_DB; +GRANT OWNERSHIP ON DATABASE HOL_DB TO ROLE HOL_ROLE; + +-- Warehouses +CREATE OR REPLACE WAREHOUSE HOL_WH WAREHOUSE_SIZE = XSMALL, AUTO_SUSPEND = 300, AUTO_RESUME= TRUE; +GRANT OWNERSHIP ON WAREHOUSE HOL_WH TO ROLE HOL_ROLE; + + +-- ---------------------------------------------------------------------------- +-- Step #3: Create the database level objects +-- ---------------------------------------------------------------------------- +USE ROLE HOL_ROLE; +USE WAREHOUSE HOL_WH; +USE DATABASE HOL_DB; + +-- Schemas +CREATE OR REPLACE SCHEMA HOL_SCHEMA; + +-- External Frostbyte objects +USE SCHEMA HOL_SCHEMA; +CREATE OR REPLACE STAGE FROSTBYTE_RAW_STAGE + URL = 's3://sfquickstarts/data-engineering-with-snowpark-python/' +; diff --git a/steps/04_load_weather.sql b/steps/04_load_weather.sql new file mode 100644 index 0000000..82aec7d --- /dev/null +++ b/steps/04_load_weather.sql @@ -0,0 +1,60 @@ +/*----------------------------------------------------------------------------- +Hands-On Lab: Intro to Data Engineering with Snowpark Python +Script: 04_load_weather.sql +Author: Jeremiah Hansen +Last Updated: 9/26/2023 +-----------------------------------------------------------------------------*/ + +-- SNOWFLAKE ADVANTAGE: Data sharing/marketplace (instead of ETL) +-- SNOWFLAKE ADVANTAGE: Visual Studio Code Snowflake native extension (PrPr, Git integration) + + +USE ROLE HOL_ROLE; +USE WAREHOUSE HOL_WH; + + +-- ---------------------------------------------------------------------------- +-- Step #1: Connect to weather data in Marketplace +-- ---------------------------------------------------------------------------- + +/*--- +But what about data that needs constant updating - like the WEATHER data? We would +need to build a pipeline process to constantly update that data to keep it fresh. + +Perhaps a better way to get this external data would be to source it from a trusted +data supplier. Let them manage the data, keeping it accurate and up to date. + +Enter the Snowflake Data Cloud... + +Weather Source is a leading provider of global weather and climate data and their +OnPoint Product Suite provides businesses with the necessary weather and climate data +to quickly generate meaningful and actionable insights for a wide range of use cases +across industries. Let's connect to the "Weather Source LLC: frostbyte" feed from +Weather Source in the Snowflake Data Marketplace by following these steps: + + -> Snowsight Home Button + -> Marketplace + -> Search: "Weather Source LLC: frostbyte" (and click on tile in results) + -> Click the blue "Get" button + -> Under "Options", adjust the Database name to read "FROSTBYTE_WEATHERSOURCE" (all capital letters) + -> Grant to "HOL_ROLE" + +That's it... we don't have to do anything from here to keep this data updated. +The provider will do that for us and data sharing means we are always seeing +whatever they they have published. + + +-- You can also do it via code if you know the account/share details... +SET WEATHERSOURCE_ACCT_NAME = '*** PUT ACCOUNT NAME HERE AS PART OF DEMO SETUP ***'; +SET WEATHERSOURCE_SHARE_NAME = '*** PUT ACCOUNT SHARE HERE AS PART OF DEMO SETUP ***'; +SET WEATHERSOURCE_SHARE = $WEATHERSOURCE_ACCT_NAME || '.' || $WEATHERSOURCE_SHARE_NAME; + +CREATE OR REPLACE DATABASE FROSTBYTE_WEATHERSOURCE + FROM SHARE IDENTIFIER($WEATHERSOURCE_SHARE); + +GRANT IMPORTED PRIVILEGES ON DATABASE FROSTBYTE_WEATHERSOURCE TO ROLE HOL_ROLE; +---*/ + + +-- Let's look at the data - same 3-part naming convention as any other table +SELECT * FROM FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.POSTAL_CODES LIMIT 100; diff --git a/steps/05_load_excel_files.sql b/steps/05_load_excel_files.sql new file mode 100644 index 0000000..094f4c5 --- /dev/null +++ b/steps/05_load_excel_files.sql @@ -0,0 +1,69 @@ +/*----------------------------------------------------------------------------- +Hands-On Lab: Intro to Data Engineering with Snowpark Python +Script: 05_load_excel_files.sql +Author: Jeremiah Hansen +Last Updated: 9/26/2023 +-----------------------------------------------------------------------------*/ + +USE ROLE HOL_ROLE; +USE WAREHOUSE HOL_WH; +USE SCHEMA HOL_DB.HOL_SCHEMA; + + +-- ---------------------------------------------------------------------------- +-- Step 1: Inspect the Excel files +-- ---------------------------------------------------------------------------- + +-- Make sure the two Excel files show up in the stage +LIST @FROSTBYTE_RAW_STAGE/intro; + +-- I've also included a copy of the Excel files in the /data folder of this repo +-- so you can look at the contents. + + +-- ---------------------------------------------------------------------------- +-- Step 2: Create the stored procedure to load Excel files +-- ---------------------------------------------------------------------------- + +CREATE OR REPLACE PROCEDURE LOAD_EXCEL_WORKSHEET_TO_TABLE_SP(file_path string, worksheet_name string, target_table string) +RETURNS VARIANT +LANGUAGE PYTHON +RUNTIME_VERSION = '3.10' +PACKAGES = ('snowflake-snowpark-python', 'pandas', 'openpyxl') +HANDLER = 'main' +AS +$$ +from snowflake.snowpark.files import SnowflakeFile +from openpyxl import load_workbook +import pandas as pd + +def main(session, file_path, worksheet_name, target_table): + with SnowflakeFile.open(file_path, 'rb') as f: + workbook = load_workbook(f) + sheet = workbook.get_sheet_by_name(worksheet_name) + data = sheet.values + + # Get the first line in file as a header line + columns = next(data)[0:] + # Create a DataFrame based on the second and subsequent lines of data + df = pd.DataFrame(data, columns=columns) + + df2 = session.create_dataframe(df) + df2.write.mode("overwrite").save_as_table(target_table) + + return True +$$; + + +-- ---------------------------------------------------------------------------- +-- Step 3: Load the Excel data +-- ---------------------------------------------------------------------------- + +CALL LOAD_EXCEL_WORKSHEET_TO_TABLE_SP(BUILD_SCOPED_FILE_URL(@FROSTBYTE_RAW_STAGE, 'intro/order_detail.xlsx'), 'order_detail', 'ORDER_DETAIL'); +CALL LOAD_EXCEL_WORKSHEET_TO_TABLE_SP(BUILD_SCOPED_FILE_URL(@FROSTBYTE_RAW_STAGE, 'intro/location.xlsx'), 'location', 'LOCATION'); + +DESCRIBE TABLE ORDER_DETAIL; +SELECT * FROM ORDER_DETAIL; + +DESCRIBE TABLE LOCATION; +SELECT * FROM LOCATION; diff --git a/steps/06_load_daily_city_metrics.sql b/steps/06_load_daily_city_metrics.sql new file mode 100644 index 0000000..077b08b --- /dev/null +++ b/steps/06_load_daily_city_metrics.sql @@ -0,0 +1,81 @@ +/*----------------------------------------------------------------------------- +Hands-On Lab: Intro to Data Engineering with Snowpark Python +Script: 06_load_daily_city_metrics.sql +Author: Jeremiah Hansen +Last Updated: 9/26/2023 +-----------------------------------------------------------------------------*/ + +USE ROLE HOL_ROLE; +USE WAREHOUSE HOL_WH; +USE SCHEMA HOL_DB.HOL_SCHEMA; + + +-- ---------------------------------------------------------------------------- +-- Step 1: Create the stored procedure to load DAILY_CITY_METRICS +-- ---------------------------------------------------------------------------- + +CREATE OR REPLACE PROCEDURE LOAD_DAILY_CITY_METRICS_SP() +RETURNS VARIANT +LANGUAGE PYTHON +RUNTIME_VERSION = '3.10' +PACKAGES = ('snowflake-snowpark-python') +HANDLER = 'main' +AS +$$ +from snowflake.snowpark import Session +import snowflake.snowpark.functions as F + +def table_exists(session, schema='', name=''): + exists = session.sql("SELECT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}') AS TABLE_EXISTS".format(schema, name)).collect()[0]['TABLE_EXISTS'] + return exists + +def main(session: Session) -> str: + schema_name = "HOL_SCHEMA" + table_name = "DAILY_CITY_METRICS" + + # Define the tables + order_detail = session.table("ORDER_DETAIL") + history_day = session.table("FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.HISTORY_DAY") + location = session.table("LOCATION") + + # Join the tables + order_detail = order_detail.join(location, order_detail['LOCATION_ID'] == location['LOCATION_ID']) + order_detail = order_detail.join(history_day, (F.builtin("DATE")(order_detail['ORDER_TS']) == history_day['DATE_VALID_STD']) & (location['ISO_COUNTRY_CODE'] == history_day['COUNTRY']) & (location['CITY'] == history_day['CITY_NAME'])) + + # Aggregate the data + final_agg = order_detail.group_by(F.col('DATE_VALID_STD'), F.col('CITY_NAME'), F.col('ISO_COUNTRY_CODE')) \ + .agg( \ + F.sum('PRICE').alias('DAILY_SALES_SUM'), \ + F.avg('AVG_TEMPERATURE_AIR_2M_F').alias("AVG_TEMPERATURE_F"), \ + F.avg("TOT_PRECIPITATION_IN").alias("AVG_PRECIPITATION_IN"), \ + ) \ + .select(F.col("DATE_VALID_STD").alias("DATE"), F.col("CITY_NAME"), F.col("ISO_COUNTRY_CODE").alias("COUNTRY_DESC"), \ + F.builtin("ZEROIFNULL")(F.col("DAILY_SALES_SUM")).alias("DAILY_SALES"), \ + F.round(F.col("AVG_TEMPERATURE_F"), 2).alias("AVG_TEMPERATURE_FAHRENHEIT"), \ + F.round(F.col("AVG_PRECIPITATION_IN"), 2).alias("AVG_PRECIPITATION_INCHES"), \ + ) + + # If the table doesn't exist then create it + if not table_exists(session, schema=schema_name, name=table_name): + final_agg.write.mode("overwrite").save_as_table(table_name) + + return f"Successfully created {table_name}" + # Otherwise update it + else: + cols_to_update = {c: final_agg[c] for c in final_agg.schema.names} + + dcm = session.table(f"{schema_name}.{table_name}") + dcm.merge(final_agg, (dcm['DATE'] == final_agg['DATE']) & (dcm['CITY_NAME'] == final_agg['CITY_NAME']) & (dcm['COUNTRY_DESC'] == final_agg['COUNTRY_DESC']), \ + [F.when_matched().update(cols_to_update), F.when_not_matched().insert(cols_to_update)]) + + return f"Successfully updated {table_name}" +$$; + + +-- ---------------------------------------------------------------------------- +-- Step 2: Load the DAILY_CITY_METRICS table +-- ---------------------------------------------------------------------------- + +CALL LOAD_DAILY_CITY_METRICS_SP(); + +SELECT * FROM DAILY_CITY_METRICS; diff --git a/steps/07_deploy_task_dag.py b/steps/07_deploy_task_dag.py new file mode 100644 index 0000000..4e14b13 --- /dev/null +++ b/steps/07_deploy_task_dag.py @@ -0,0 +1,122 @@ +#------------------------------------------------------------------------------ +# Hands-On Lab: Intro to Data Engineering with Snowpark Python +# Script: 07_deploy_task_dag.py +# Author: Jeremiah Hansen +# Last Updated: 9/26/2023 +#------------------------------------------------------------------------------ + +from datetime import timedelta + +#from snowflake.connector import connect +from snowflake.snowpark import Session +from snowflake.snowpark import functions as F + +from snowflake.core import Root +from snowflake.core.task import StoredProcedureCall, Task +from snowflake.core.task.dagv1 import DAGOperation, DAG, DAGTask + + +# Alternative way to create the tasks +def create_tasks_procedurally(session: Session) -> str: + database_name = "HOL_DB" + schema_name = "HOL_SCHEMA" + warehouse_name = "HOL_WH" + + api_root = Root(session) + schema = api_root.databases[database_name].schemas[schema_name] + tasks = schema.tasks + + # Define the tasks + task1_entity = Task( + "LOAD_ORDER_DETAIL_TASK", + definition="CALL LOAD_EXCEL_WORKSHEET_TO_TABLE_SP(BUILD_SCOPED_FILE_URL(@FROSTBYTE_RAW_STAGE, 'intro/order_detail.xlsx'), 'order_detail', 'ORDER_DETAIL')", + warehouse=warehouse_name + ) + task2_entity = Task( + "LOAD_LOCATION_TASK", + definition="CALL LOAD_EXCEL_WORKSHEET_TO_TABLE_SP(BUILD_SCOPED_FILE_URL(@FROSTBYTE_RAW_STAGE, 'intro/location.xlsx'), 'location', 'LOCATION')", + warehouse=warehouse_name + ) + task3_entity = Task( + "LOAD_DAILY_CITY_METRICS_TASK", + definition="CALL LOAD_DAILY_CITY_METRICS_SP()", + warehouse=warehouse_name + ) + task2_entity.predecessors = [task1_entity.name] + task3_entity.predecessors = [task2_entity.name] + + # Create the tasks in Snowflake + task1 = tasks.create(task1_entity, mode="orReplace") + task2 = tasks.create(task2_entity, mode="orReplace") + task3 = tasks.create(task3_entity, mode="orReplace") + + # List the tasks in Snowflake + for t in tasks.iter(like="%task"): + print(f"Definition of {t.name}: \n\n", t.name, t.definition, sep="", end="\n\n--------------------------\n\n") + + task1.execute() + +# task1.get_current_graphs() + +# task1.suspend() +# task2.suspend() +# task3.suspend() +# task3.delete() +# task2.delete() +# task1.delete() + + +# Create the tasks using the DAG API +def main(session: Session) -> str: + database_name = "HOL_DB" + schema_name = "HOL_SCHEMA" + warehouse_name = "HOL_WH" + + api_root = Root(session) + schema = api_root.databases[database_name].schemas[schema_name] + tasks = schema.tasks + dag_op = DAGOperation(schema) + + # Define the DAG + dag_name = "HOL_DAG" + dag = DAG(dag_name, schedule=timedelta(days=1), warehouse=warehouse_name) + with dag: + dag_task1 = DAGTask("LOAD_ORDER_DETAIL_TASK", definition="CALL LOAD_EXCEL_WORKSHEET_TO_TABLE_SP(BUILD_SCOPED_FILE_URL(@FROSTBYTE_RAW_STAGE, 'intro/order_detail.xlsx'), 'order_detail', 'ORDER_DETAIL')", warehouse=warehouse_name) + dag_task2 = DAGTask("LOAD_LOCATION_TASK", definition="CALL LOAD_EXCEL_WORKSHEET_TO_TABLE_SP(BUILD_SCOPED_FILE_URL(@FROSTBYTE_RAW_STAGE, 'intro/location.xlsx'), 'location', 'LOCATION')", warehouse=warehouse_name) + dag_task3 = DAGTask("LOAD_DAILY_CITY_METRICS_TASK", definition="CALL LOAD_DAILY_CITY_METRICS_SP()", warehouse=warehouse_name) + + dag_task3 >> dag_task1 + dag_task3 >> dag_task2 + + # Create the DAG in Snowflake + dag_op.deploy(dag, mode="orreplace") + + dagiter = dag_op.iter_dags(like='hol_dag%') + for dag_name in dagiter: + print(dag_name) + + dag_op.run(dag) + +# dag_op.delete(dag) + + return f"Successfully created and started the DAG" + + +# For local debugging +# Be aware you may need to type-convert arguments if you add input parameters +if __name__ == '__main__': + import os, sys + # Add the utils package to our path and import the snowpark_utils function + current_dir = os.getcwd() + parent_dir = os.path.dirname(current_dir) + sys.path.append(parent_dir) + + from utils import snowpark_utils + session = snowpark_utils.get_snowpark_session() + + if len(sys.argv) > 1: + print(main(session, *sys.argv[1:])) # type: ignore + else: + print(main(session)) # type: ignore + + session.close() diff --git a/steps/08_monitor_task_dag.md b/steps/08_monitor_task_dag.md new file mode 100644 index 0000000..449cbd0 --- /dev/null +++ b/steps/08_monitor_task_dag.md @@ -0,0 +1 @@ +Please read and follow the instructions in step 8 in the [Intro to Data Engineering with Snowpark Python](https://quickstarts.snowflake.com/) diff --git a/steps/09_teardown.sql b/steps/09_teardown.sql new file mode 100644 index 0000000..b950f0b --- /dev/null +++ b/steps/09_teardown.sql @@ -0,0 +1,16 @@ +/*----------------------------------------------------------------------------- +Hands-On Lab: Intro to Data Engineering with Snowpark Python +Script: 09_teardown.sql +Author: Jeremiah Hansen +Last Updated: 9/26/2023 +-----------------------------------------------------------------------------*/ + + +USE ROLE ACCOUNTADMIN; + +DROP DATABASE HOL_DB; +DROP WAREHOUSE HOL_WH; +DROP ROLE HOL_ROLE; + +-- Drop the weather share +DROP DATABASE FROSTBYTE_WEATHERSOURCE; diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/snowpark_utils.py b/utils/snowpark_utils.py new file mode 100644 index 0000000..c9fd8e8 --- /dev/null +++ b/utils/snowpark_utils.py @@ -0,0 +1,84 @@ +from snowflake.snowpark import Session +import os +from typing import Optional + +# Class to store a singleton connection option +class SnowflakeConnection(object): + _connection = None + + @property + def connection(self) -> Optional[Session]: + return type(self)._connection + + @connection.setter + def connection(self, val): + type(self)._connection = val + +# Function to return a configured Snowpark session +def get_snowpark_session() -> Session: + # if running in snowflake + if SnowflakeConnection().connection: + # Not sure what this does? + session = SnowflakeConnection().connection + # if running locally with a config file + # TODO: Look for a creds.json style file. This should be the way all snowpark + # related tools work IMO + # if using snowsql config, like snowcli does + elif os.path.exists(os.path.expanduser('~/.snowsql/config')): + snowpark_config = get_snowsql_config() + SnowflakeConnection().connection = Session.builder.configs(snowpark_config).create() + # otherwise configure from environment variables + elif "SNOWSQL_ACCOUNT" in os.environ: + snowpark_config = { + "account": os.environ["SNOWSQL_ACCOUNT"], + "user": os.environ["SNOWSQL_USER"], + "password": os.environ["SNOWSQL_PWD"], + "role": os.environ["SNOWSQL_ROLE"], + "warehouse": os.environ["SNOWSQL_WAREHOUSE"], + "database": os.environ["SNOWSQL_DATABASE"], + "schema": os.environ["SNOWSQL_SCHEMA"] + } + SnowflakeConnection().connection = Session.builder.configs(snowpark_config).create() + + if SnowflakeConnection().connection: + return SnowflakeConnection().connection # type: ignore + else: + raise Exception("Unable to create a Snowpark session") + + +# Mimic the snowcli logic for getting config details, but skip the app.toml processing +# since this will be called outside the snowcli app context. +# TODO: It would be nice to get rid of this entirely and always use creds.json but +# need to update snowcli to make that happen +def get_snowsql_config( + connection_name: str = 'dev', + config_file_path: str = os.path.expanduser('~/.snowsql/config'), +) -> dict: + import configparser + + snowsql_to_snowpark_config_mapping = { + 'account': 'account', + 'accountname': 'account', + 'username': 'user', + 'password': 'password', + 'rolename': 'role', + 'warehousename': 'warehouse', + 'dbname': 'database', + 'schemaname': 'schema' + } + try: + config = configparser.ConfigParser(inline_comment_prefixes="#") + connection_path = 'connections.' + connection_name + + config.read(config_file_path) + session_config = config[connection_path] + # Convert snowsql connection variable names to snowcli ones + session_config_dict = { + snowsql_to_snowpark_config_mapping[k]: v.strip('"') + for k, v in session_config.items() + } + return session_config_dict + except Exception: + raise Exception( + "Error getting snowsql config details" + )