Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Awsdevops #1

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Pipeline/.env.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# .env file
SQL_SERVER_CONN_ID=sql_server_default
SNOWFLAKE_CONN_ID=snowflake_default
# The table name in Snowflake where data will be loaded (if needed)
SNOWFLAKE_TABLE=your_snowflake_table
# The table name in SQL Server from which data will be fetched
SQL_SERVER_TABLE=your_sql_server_table

# If you are not using Airflow's connection management system, you would define the connection parameters here:
# SQL Server connection parameters
DB_SERVER=your_sql_server
DB_DATABASE=your_database
DB_USERNAME=your_username
DB_PASSWORD=your_password
DB_DRIVER={ODBC Driver 17 for SQL Server}

# Snowflake connection parameters (if you are going to connect directly without Airflow hooks)
SNOWFLAKE_ACCOUNT=your_snowflake_account
SNOWFLAKE_USER=your_snowflake_username
SNOWFLAKE_PASSWORD=your_snowflake_password
SNOWFLAKE_WAREHOUSE=your_snowflake_warehouse
SNOWFLAKE_DATABASE=your_snowflake_database
SNOWFLAKE_SCHEMA=your_snowflake_schema
SNOWFLAKE_ROLE=your_snowflake_role
126 changes: 126 additions & 0 deletions Pipeline/Overview.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# notebook.py\n",
"\n",
"import pyodbc\n",
"import pandas as pd\n",
"import numpy as np\n",
"import seaborn as sns\n",
"import matplotlib.pyplot as plt\n",
"\n",
"# # Database connection parameters\n",
"# server = 'your_server'\n",
"# database = 'your_database'\n",
"# username = 'your_username'\n",
"# password = 'your_password'\n",
"# table_name = 'your_table'\n",
"\n",
"# # Create a connection string\n",
"# conn_str = (\n",
"# f'DRIVER={{SQL Server}};'\n",
"# f'SERVER={server};'\n",
"# f'DATABASE={database};'\n",
"# f'UID={username};'\n",
"# f'PWD={password}'\n",
"# )\n",
"\n",
"# # Connect to the database\n",
"# conn = pyodbc.connect(conn_str)\n",
"\n",
"# # Query the database and load the data into a DataFrame\n",
"# query = f'SELECT * FROM {table_name}'\n",
"# df = pd.read_sql(query, conn)\n",
"\n",
"# # Close the connection\n",
"# conn.close()\n",
"\n",
"# # Display the first few rows of the DataFrame\n",
"# print(df.head())\n",
"\n",
"# # Display the data structure and types\n",
"# print(df.info())\n",
"\n",
"# # Display summary statistics\n",
"# print(df.describe())\n",
"\n",
"\n",
"import os\n",
"from dotenv import load_dotenv\n",
"from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook\n",
"import pandas as pd\n",
"import numpy as np\n",
"import seaborn as sns\n",
"import matplotlib.pyplot as plt\n",
"\n",
"# Load environment variables from .env file\n",
"load_dotenv()\n",
"\n",
"# Retrieve connection ID for SQL Server from environment variables\n",
"SQL_SERVER_CONN_ID = os.getenv('SQL_SERVER_CONN_ID')\n",
"\n",
"# Using the connection ID to get connection parameters from Airflow\n",
"sql_server_hook = MsSqlHook(mssql_conn_id=SQL_SERVER_CONN_ID)\n",
"\n",
"# Sample usage for establishing a connection and fetching data into a Pandas DataFrame\n",
"def fetch_data_from_sql_server():\n",
" conn = sql_server_hook.get_conn()\n",
" cursor = conn.cursor()\n",
" query = \"SELECT TOP(1000) * FROM your_table\" # Replace 'your_table' with your actual table name\n",
" df = pd.read_sql(query, conn)\n",
" cursor.close()\n",
" conn.close()\n",
" return df\n",
"\n",
"# Function to perform data analysis\n",
"def analyze_data(df):\n",
" # Display the first few rows of the DataFrame\n",
" print(df.head())\n",
"\n",
" # Display the data structure and types\n",
" print(df.info())\n",
"\n",
" # Display summary statistics for numeric columns\n",
" print(df.describe())\n",
"\n",
" # Display data types and count of non-null values for each column\n",
" print(df.dtypes)\n",
" print(df.count())\n",
"\n",
" # Generate a heatmap for correlations if there are numeric columns\n",
" if df.select_dtypes(include=[np.number]).shape[1] > 1:\n",
" plt.figure(figsize=(10, 8))\n",
" sns.heatmap(df.corr(), annot=True, fmt='.2f', cmap='coolwarm')\n",
" plt.title('Heatmap of Correlation Between Numerical Features')\n",
" plt.show()\n",
"\n",
"# Main execution\n",
"if __name__ == '__main__':\n",
" # Fetch data from SQL Server\n",
" data_frame = fetch_data_from_sql_server()\n",
"\n",
" # Analyze the data\n",
" analyze_data(data_frame)\n",
"\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "base",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
65 changes: 62 additions & 3 deletions Pipeline/connector.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,62 @@
import pyodbc
import pymssql
import snowflake.connector
# import pyodbc
# import pymssql
# import snowflake.connector

# connector.py with Airflow hooks
import os
import pandas as pd
import snowflake.connector
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from snowflake.connector.pandas_tools import write_pandas

# Environment variables for sensitive information
SQL_SERVER_CONN_ID = os.getenv('SQL_SERVER_CONN_ID')
SNOWFLAKE_CONN_ID = os.getenv('SNOWFLAKE_CONN_ID')
SNOWFLAKE_TABLE = os.getenv('SNOWFLAKE_TABLE')

def get_data_from_sql_server(query):
"""
This function to connect to SQL Server and return a dataframe
"""
mssql_hook = MsSqlHook(mssql_conn_id=SQL_SERVER_CONN_ID)
df = mssql_hook.get_pandas_df(sql=query)
return df


def load_data_to_snowflake(df):
"""
# takes df as arg, Function to connect to Snowflake
"""
snowflake_hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID)
connection = snowflake_hook.get_conn()
cursor = connection.cursor()

# Load DataFrame to Snowflake with idempotency checks
# Assuming the table has an ID column that uniquely identifies records
for index, row in df.iterrows():
merge_sql = f"""
MERGE INTO {SNOWFLAKE_TABLE} T
USING (SELECT {row['ID']} AS ID, ...) S
ON T.ID = S.ID
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT (ID, ...) VALUES (S.ID, ...);
"""
cursor.execute(merge_sql)

cursor.close()
connection.close()

# Function to run the ETL process
def run_etl(query):
df = get_data_from_sql_server(query)
if not df.empty:
load_data_to_snowflake(df)

# Entry point for the script
if __name__ == "__main__":
# Define your SQL Server query
sql_query = "SELECT * FROM my_table1 WHERE condition='new_data'"

# Run ETL process
run_etl(sql_query)
24 changes: 24 additions & 0 deletions Pipeline/tests/test_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# test_connector.py

import unittest
from unittest.mock import patch, MagicMock
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook

class TestSnowflakeHook(unittest.TestCase):
@patch('airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.get_conn')
def test_snowflake_hook_connection(self, mock_get_conn):
# Mock the get_conn method
mock_get_conn.return_value = MagicMock()

# Instantiate the SnowflakeHook
hook = SnowflakeHook(snowflake_conn_id='MY_SNOWFLAKE_CONN_ID')

# Call the get_conn method
conn = hook.get_conn()

# Assert the connection was called and is a MagicMock instance
mock_get_conn.assert_called_once()
self.assertIsInstance(conn, MagicMock)

if __name__ == '__main__':
unittest.main()
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,28 @@
# Eplus Project
This is a Data Warehousing Project that involves the use of either Microsoft Fabric for teh entire layout or the use of SSID as a data warehousing tool or the route of AWS for data pipelines, Snowflake for the warehouse and Power BI for front end



## references:
To gain a deeper understanding of the `connector.py` module, which involves the usage of Apache Airflow hooks for Snowflake and Microsoft SQL Server, as well as the Snowflake Python connector, you can refer to the following official documentation and resources:

### Apache Airflow Documentation

Resources for understanding how to use the following tools effectively with connector.py module and ensure that your data pipelines are robust, secure, and maintainable.

- **Airflow Documentation**: The main landing page for Apache Airflow documentation, which provides a comprehensive overview of concepts, usage, and API references.
[Apache Airflow Documentation](https://airflow.apache.org/docs/)

- **SnowflakeHook**: Documentation specific to the SnowflakeHook, which is part of the Airflow providers' package for Snowflake.
[SnowflakeHook Reference](https://airflow.apache.org/docs/apache-airflow-providers-snowflake/stable/index.html)

- **MsSqlHook**: Documentation specific to the MsSqlHook, which is part of the Airflow providers' package for Microsoft SQL Server.
[MsSqlHook Reference](https://airflow.apache.org/docs/apache-airflow-providers-microsoft-mssql/stable/index.html)

### Snowflake Python Connector Documentation

- **Snowflake Connector for Python**: Official documentation for the Snowflake connector, which includes installation instructions, usage examples, and API details.
[Snowflake Connector for Python Documentation](https://docs.snowflake.com/en/user-guide/python-connector.html)

- **Pandas Tools for Snowflake**: Details about the `write_pandas` method within the Snowflake connector that allows for easy data frame uploads to Snowflake.
[Pandas Tools for Snowflake](https://docs.snowflake.com/en/user-guide/python-connector-pandas.html)