From b758b9012f8e600cde0c45f3d507b9517205a997 Mon Sep 17 00:00:00 2001 From: peterkuria1 Date: Mon, 6 Nov 2023 14:05:42 +0300 Subject: [PATCH 1/6] Add env sample --- Pipeline/.env.sample | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 Pipeline/.env.sample diff --git a/Pipeline/.env.sample b/Pipeline/.env.sample new file mode 100644 index 0000000..dfedcd9 --- /dev/null +++ b/Pipeline/.env.sample @@ -0,0 +1,24 @@ +# .env file +SQL_SERVER_CONN_ID=sql_server_default +SNOWFLAKE_CONN_ID=snowflake_default +# The table name in Snowflake where data will be loaded (if needed) +SNOWFLAKE_TABLE=your_snowflake_table +# The table name in SQL Server from which data will be fetched +SQL_SERVER_TABLE=your_sql_server_table + +# If you are not using Airflow's connection management system, you would define the connection parameters here: +# SQL Server connection parameters +DB_SERVER=your_sql_server +DB_DATABASE=your_database +DB_USERNAME=your_username +DB_PASSWORD=your_password +DB_DRIVER={ODBC Driver 17 for SQL Server} + +# Snowflake connection parameters (if you are going to connect directly without Airflow hooks) +SNOWFLAKE_ACCOUNT=your_snowflake_account +SNOWFLAKE_USER=your_snowflake_username +SNOWFLAKE_PASSWORD=your_snowflake_password +SNOWFLAKE_WAREHOUSE=your_snowflake_warehouse +SNOWFLAKE_DATABASE=your_snowflake_database +SNOWFLAKE_SCHEMA=your_snowflake_schema +SNOWFLAKE_ROLE=your_snowflake_role From 594bb8dab350d4355d826e84cd0ec83f9186149a Mon Sep 17 00:00:00 2001 From: peterkuria1 Date: Mon, 6 Nov 2023 14:06:09 +0300 Subject: [PATCH 2/6] update connector to use Apache Airflow --- Pipeline/connector.py | 65 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 62 insertions(+), 3 deletions(-) diff --git a/Pipeline/connector.py b/Pipeline/connector.py index e888ea7..9dee543 100644 --- a/Pipeline/connector.py +++ b/Pipeline/connector.py @@ -1,3 +1,62 @@ -import pyodbc -import pymssql -import snowflake.connector \ No newline at end of file +# import pyodbc +# import pymssql +# import snowflake.connector + +# connector.py with Airflow hooks +import os +import pandas as pd +import snowflake.connector +from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook +from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook +from snowflake.connector.pandas_tools import write_pandas + +# Environment variables for sensitive information +SQL_SERVER_CONN_ID = os.getenv('SQL_SERVER_CONN_ID') +SNOWFLAKE_CONN_ID = os.getenv('SNOWFLAKE_CONN_ID') +SNOWFLAKE_TABLE = os.getenv('SNOWFLAKE_TABLE') + +def get_data_from_sql_server(query): + """ + This function to connect to SQL Server and return a dataframe + """ + mssql_hook = MsSqlHook(mssql_conn_id=SQL_SERVER_CONN_ID) + df = mssql_hook.get_pandas_df(sql=query) + return df + + +def load_data_to_snowflake(df): + """ + # takes df as arg, Function to connect to Snowflake + """ + snowflake_hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID) + connection = snowflake_hook.get_conn() + cursor = connection.cursor() + + # Load DataFrame to Snowflake with idempotency checks + # Assuming the table has an ID column that uniquely identifies records + for index, row in df.iterrows(): + merge_sql = f""" + MERGE INTO {SNOWFLAKE_TABLE} T + USING (SELECT {row['ID']} AS ID, ...) S + ON T.ID = S.ID + WHEN MATCHED THEN UPDATE SET ... + WHEN NOT MATCHED THEN INSERT (ID, ...) VALUES (S.ID, ...); + """ + cursor.execute(merge_sql) + + cursor.close() + connection.close() + +# Function to run the ETL process +def run_etl(query): + df = get_data_from_sql_server(query) + if not df.empty: + load_data_to_snowflake(df) + +# Entry point for the script +if __name__ == "__main__": + # Define your SQL Server query + sql_query = "SELECT * FROM my_table1 WHERE condition='new_data'" + + # Run ETL process + run_etl(sql_query) From 71a4f38a591f8e0a78ec44b63b892b46e08b01c1 Mon Sep 17 00:00:00 2001 From: peterkuria1 Date: Mon, 6 Nov 2023 14:07:58 +0300 Subject: [PATCH 3/6] [Docs]: update docs --- README.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/README.md b/README.md index b5bcdb4..d0acb1f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,28 @@ # Eplus Project This is a Data Warehousing Project that involves the use of either Microsoft Fabric for teh entire layout or the use of SSID as a data warehousing tool or the route of AWS for data pipelines, Snowflake for the warehouse and Power BI for front end + + + +## references: +To gain a deeper understanding of the `connector.py` module, which involves the usage of Apache Airflow hooks for Snowflake and Microsoft SQL Server, as well as the Snowflake Python connector, you can refer to the following official documentation and resources: + +### Apache Airflow Documentation + + Resources for understanding how to use the following tools effectively with connector.py module and ensure that your data pipelines are robust, secure, and maintainable. + +- **Airflow Documentation**: The main landing page for Apache Airflow documentation, which provides a comprehensive overview of concepts, usage, and API references. + [Apache Airflow Documentation](https://airflow.apache.org/docs/) + +- **SnowflakeHook**: Documentation specific to the SnowflakeHook, which is part of the Airflow providers' package for Snowflake. + [SnowflakeHook Reference](https://airflow.apache.org/docs/apache-airflow-providers-snowflake/stable/index.html) + +- **MsSqlHook**: Documentation specific to the MsSqlHook, which is part of the Airflow providers' package for Microsoft SQL Server. + [MsSqlHook Reference](https://airflow.apache.org/docs/apache-airflow-providers-microsoft-mssql/stable/index.html) + +### Snowflake Python Connector Documentation + +- **Snowflake Connector for Python**: Official documentation for the Snowflake connector, which includes installation instructions, usage examples, and API details. + [Snowflake Connector for Python Documentation](https://docs.snowflake.com/en/user-guide/python-connector.html) + +- **Pandas Tools for Snowflake**: Details about the `write_pandas` method within the Snowflake connector that allows for easy data frame uploads to Snowflake. + [Pandas Tools for Snowflake](https://docs.snowflake.com/en/user-guide/python-connector-pandas.html) \ No newline at end of file From a7687dca907611db9e299179d1da93dbdee64bf0 Mon Sep 17 00:00:00 2001 From: peterkuria1 Date: Mon, 6 Nov 2023 14:08:21 +0300 Subject: [PATCH 4/6] Add a test notebook --- Pipeline/Overview.ipynb | 68 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 Pipeline/Overview.ipynb diff --git a/Pipeline/Overview.ipynb b/Pipeline/Overview.ipynb new file mode 100644 index 0000000..af1dfb9 --- /dev/null +++ b/Pipeline/Overview.ipynb @@ -0,0 +1,68 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# notebook.py\n", + "\n", + "import pyodbc\n", + "import pandas as pd\n", + "import numpy as np\n", + "import seaborn as sns\n", + "import matplotlib.pyplot as plt\n", + "\n", + "# Database connection parameters\n", + "server = 'your_server'\n", + "database = 'your_database'\n", + "username = 'your_username'\n", + "password = 'your_password'\n", + "table_name = 'your_table'\n", + "\n", + "# Create a connection string\n", + "conn_str = (\n", + " f'DRIVER={{SQL Server}};'\n", + " f'SERVER={server};'\n", + " f'DATABASE={database};'\n", + " f'UID={username};'\n", + " f'PWD={password}'\n", + ")\n", + "\n", + "# Connect to the database\n", + "conn = pyodbc.connect(conn_str)\n", + "\n", + "# Query the database and load the data into a DataFrame\n", + "query = f'SELECT * FROM {table_name}'\n", + "df = pd.read_sql(query, conn)\n", + "\n", + "# Close the connection\n", + "conn.close()\n", + "\n", + "# Display the first few rows of the DataFrame\n", + "print(df.head())\n", + "\n", + "# Display the data structure and types\n", + "print(df.info())\n", + "\n", + "# Display summary statistics\n", + "print(df.describe())\n", + "\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "base", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.10.12" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} From 373549f5cb807381d53002091abd36eb3633da6a Mon Sep 17 00:00:00 2001 From: peterkuria1 Date: Mon, 6 Nov 2023 14:10:15 +0300 Subject: [PATCH 5/6] Add unittest for the db connector --- Pipeline/tests/test_connector.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 Pipeline/tests/test_connector.py diff --git a/Pipeline/tests/test_connector.py b/Pipeline/tests/test_connector.py new file mode 100644 index 0000000..9ef252f --- /dev/null +++ b/Pipeline/tests/test_connector.py @@ -0,0 +1,24 @@ +# test_connector.py + +import unittest +from unittest.mock import patch, MagicMock +from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook + +class TestSnowflakeHook(unittest.TestCase): + @patch('airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.get_conn') + def test_snowflake_hook_connection(self, mock_get_conn): + # Mock the get_conn method + mock_get_conn.return_value = MagicMock() + + # Instantiate the SnowflakeHook + hook = SnowflakeHook(snowflake_conn_id='MY_SNOWFLAKE_CONN_ID') + + # Call the get_conn method + conn = hook.get_conn() + + # Assert the connection was called and is a MagicMock instance + mock_get_conn.assert_called_once() + self.assertIsInstance(conn, MagicMock) + +if __name__ == '__main__': + unittest.main() From 9bd3ec363205911a40bea90c1d646b3b8daedc87 Mon Sep 17 00:00:00 2001 From: peterkuria1 Date: Mon, 6 Nov 2023 14:12:55 +0300 Subject: [PATCH 6/6] update notebook --- Pipeline/Overview.ipynb | 126 +++++++++++++++++++++++++++++----------- 1 file changed, 92 insertions(+), 34 deletions(-) diff --git a/Pipeline/Overview.ipynb b/Pipeline/Overview.ipynb index af1dfb9..32cbf47 100644 --- a/Pipeline/Overview.ipynb +++ b/Pipeline/Overview.ipynb @@ -14,40 +14,98 @@ "import seaborn as sns\n", "import matplotlib.pyplot as plt\n", "\n", - "# Database connection parameters\n", - "server = 'your_server'\n", - "database = 'your_database'\n", - "username = 'your_username'\n", - "password = 'your_password'\n", - "table_name = 'your_table'\n", - "\n", - "# Create a connection string\n", - "conn_str = (\n", - " f'DRIVER={{SQL Server}};'\n", - " f'SERVER={server};'\n", - " f'DATABASE={database};'\n", - " f'UID={username};'\n", - " f'PWD={password}'\n", - ")\n", - "\n", - "# Connect to the database\n", - "conn = pyodbc.connect(conn_str)\n", - "\n", - "# Query the database and load the data into a DataFrame\n", - "query = f'SELECT * FROM {table_name}'\n", - "df = pd.read_sql(query, conn)\n", - "\n", - "# Close the connection\n", - "conn.close()\n", - "\n", - "# Display the first few rows of the DataFrame\n", - "print(df.head())\n", - "\n", - "# Display the data structure and types\n", - "print(df.info())\n", - "\n", - "# Display summary statistics\n", - "print(df.describe())\n", + "# # Database connection parameters\n", + "# server = 'your_server'\n", + "# database = 'your_database'\n", + "# username = 'your_username'\n", + "# password = 'your_password'\n", + "# table_name = 'your_table'\n", + "\n", + "# # Create a connection string\n", + "# conn_str = (\n", + "# f'DRIVER={{SQL Server}};'\n", + "# f'SERVER={server};'\n", + "# f'DATABASE={database};'\n", + "# f'UID={username};'\n", + "# f'PWD={password}'\n", + "# )\n", + "\n", + "# # Connect to the database\n", + "# conn = pyodbc.connect(conn_str)\n", + "\n", + "# # Query the database and load the data into a DataFrame\n", + "# query = f'SELECT * FROM {table_name}'\n", + "# df = pd.read_sql(query, conn)\n", + "\n", + "# # Close the connection\n", + "# conn.close()\n", + "\n", + "# # Display the first few rows of the DataFrame\n", + "# print(df.head())\n", + "\n", + "# # Display the data structure and types\n", + "# print(df.info())\n", + "\n", + "# # Display summary statistics\n", + "# print(df.describe())\n", + "\n", + "\n", + "import os\n", + "from dotenv import load_dotenv\n", + "from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook\n", + "import pandas as pd\n", + "import numpy as np\n", + "import seaborn as sns\n", + "import matplotlib.pyplot as plt\n", + "\n", + "# Load environment variables from .env file\n", + "load_dotenv()\n", + "\n", + "# Retrieve connection ID for SQL Server from environment variables\n", + "SQL_SERVER_CONN_ID = os.getenv('SQL_SERVER_CONN_ID')\n", + "\n", + "# Using the connection ID to get connection parameters from Airflow\n", + "sql_server_hook = MsSqlHook(mssql_conn_id=SQL_SERVER_CONN_ID)\n", + "\n", + "# Sample usage for establishing a connection and fetching data into a Pandas DataFrame\n", + "def fetch_data_from_sql_server():\n", + " conn = sql_server_hook.get_conn()\n", + " cursor = conn.cursor()\n", + " query = \"SELECT TOP(1000) * FROM your_table\" # Replace 'your_table' with your actual table name\n", + " df = pd.read_sql(query, conn)\n", + " cursor.close()\n", + " conn.close()\n", + " return df\n", + "\n", + "# Function to perform data analysis\n", + "def analyze_data(df):\n", + " # Display the first few rows of the DataFrame\n", + " print(df.head())\n", + "\n", + " # Display the data structure and types\n", + " print(df.info())\n", + "\n", + " # Display summary statistics for numeric columns\n", + " print(df.describe())\n", + "\n", + " # Display data types and count of non-null values for each column\n", + " print(df.dtypes)\n", + " print(df.count())\n", + "\n", + " # Generate a heatmap for correlations if there are numeric columns\n", + " if df.select_dtypes(include=[np.number]).shape[1] > 1:\n", + " plt.figure(figsize=(10, 8))\n", + " sns.heatmap(df.corr(), annot=True, fmt='.2f', cmap='coolwarm')\n", + " plt.title('Heatmap of Correlation Between Numerical Features')\n", + " plt.show()\n", + "\n", + "# Main execution\n", + "if __name__ == '__main__':\n", + " # Fetch data from SQL Server\n", + " data_frame = fetch_data_from_sql_server()\n", + "\n", + " # Analyze the data\n", + " analyze_data(data_frame)\n", "\n" ] }