Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
ISL-0111 committed Nov 17, 2024
1 parent d9b3ce7 commit b0d9548
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 332 deletions.
44 changes: 29 additions & 15 deletions .github/workflows/cicd.yml
Original file line number Diff line number Diff line change
@@ -1,19 +1,33 @@
name: CICD

on: [push]


name: Databricks CI/CD Pipeline

on:
push:
branches:
- main
jobs:
build:
build-and-deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: install packages
run: make install
- name: lint
run: make lint
- name: test
run: make test
- name: format
run: make format
- name: Checkout Repository
uses: actions/checkout@v3

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: 3.8

- name: Install Databricks CLI
run: pip install databricks-cli

- name: Configure Databricks CLI
env:
DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }}
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
run: |
echo "[DEFAULT]" > ~/.databrickscfg
echo "host = $DATABRICKS_HOST" >> ~/.databrickscfg
echo "token = $DATABRICKS_TOKEN" >> ~/.databrickscfg
- name: Deploy Notebooks to Databricks
run: |
databricks workspace import_dir ./notebooks /Workspace/YourFolder --overwrite
5 changes: 1 addition & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ format:
black *.py

lint:
#disable comment to test speed
#pylint --disable=R,C --ignore-patterns=test_.*?py *.py mylib/*.py
#ruff linting is 10-100X faster than pylint
ruff check *.py mylib/*.py
pylint check *.py mylib/*.py

test:
python -m pytest -vv -cov=mylib test_*.py
Expand Down
64 changes: 25 additions & 39 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,37 @@
"""
Main entry point for HR_Attrition analysis
"""

from mylib.lib import (
start_spark,
end_spark,
extract,
load_data,
describe,
example_transform,
log_output,
reset_log, # reset_log
)

from mylib.extract import extract
from mylib.transform import transform_data
from mylib.load import load_data
from mylib.query import filter_and_save_data

def main():
reset_log() # Clear log at the beginning to avoid duplication

extract()

spark = start_spark("HR_Attrition")
"""
Orchestrates the ETL pipeline.
"""
# Database and table configuration
database = "HR_Analytics"
raw_table = "Raw_HR_Data"
transformed_table = "Employee_Attrition_Data"
filtered_table = "Employee_Attrition_Data_Filtered"

df = load_data(spark)
describe(df)
try:
print("Extracting data...")
extract(table_name=raw_table, database=database)

run_query(spark, df, "Yes")
run_query(spark, df, "No")
print("Transforming data...")
transform_data(database, raw_table, transformed_table)

example_transform(df)
end_spark(spark)
print("Loading data...")
load_data(database, transformed_table)

print("Filtering data and saving to new table...")
filter_and_save_data(database, transformed_table, filtered_table)

def run_query(spark, df, attrition_value):
"""Filter by Attrition value and log the results."""

df.createOrReplaceTempView("HR_Attrition")

query = f"SELECT * FROM HR_Attrition WHERE Attrition = '{attrition_value}'"
result = spark.sql(query)

log_output(
f"Results for Attrition = '{attrition_value}'",
result.toPandas().to_markdown(),
query,
)
result.show(truncate=False)
except Exception as e:
print(f"ETL pipeline failed: {e}")
raise


if __name__ == "__main__":
main()

100 changes: 0 additions & 100 deletions mylib/ETL.py

This file was deleted.

49 changes: 49 additions & 0 deletions mylib/extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
Module for extracting data from an external source, cleaning it, and saving to a Delta table.
"""

# Standard library imports
import re

# Third-party library imports
import pandas as pd
from pyspark.sql import SparkSession

def extract(table_name, database="HR_Analytics"):
"""
Extract data from a predefined URL, clean, and save to Delta table.
Args:
table_name (str): Name of the Delta table to create.
database (str): Databricks database.
Returns:
None
"""
# Define the URL for the dataset
url = (
"https://raw.githubusercontent.com/nogibjj/"
"Mini_PJT_8_Transitioning_from_Python_to_Rust_ISL/main/HR_1.csv"
)

# Initialize SparkSession (Databricks environment)
spark = SparkSession.builder.getOrCreate()

# Load CSV using pandas
hr_dataframe = pd.read_csv(url)
hr_dataframe.columns = [
re.sub(r"[^\w]", "_", col.strip()).replace("__", "_")
for col in hr_dataframe.columns
]

# Convert to Spark DataFrame
spark_df = spark.createDataFrame(hr_dataframe)

# Ensure database exists
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database}")

# Write to Delta table
spark_df.write.format("delta").mode("overwrite").saveAsTable(
f"{database}.{table_name}"
)
print(f"Data successfully extracted and saved to {database}.{table_name}")
Loading

0 comments on commit b0d9548

Please sign in to comment.