From 5a2ea38249764bfc974c52c60440268ccaf935b3 Mon Sep 17 00:00:00 2001 From: Leonard Eshun Date: Fri, 29 Nov 2024 14:02:26 -0500 Subject: [PATCH] Resolving pyspark issue --- Test_Log.md | 107 ------------------------ main.py | 145 -------------------------------- pyspark.py | 35 -------- test_main.py | 231 --------------------------------------------------- 4 files changed, 518 deletions(-) delete mode 100644 Test_Log.md delete mode 100644 main.py delete mode 100644 pyspark.py delete mode 100644 test_main.py diff --git a/Test_Log.md b/Test_Log.md deleted file mode 100644 index 627f8e9..0000000 --- a/Test_Log.md +++ /dev/null @@ -1,107 +0,0 @@ -### Extraction Test ### -Removing existing CSV file exists
Confirming that CSV file doesn't exists...
Test Successful
Extracting data and saving...
Testing if CSV file exists...
Extraction Test Successful - - -### Transform and Load Test ### -Creating non-lookup table: air_quality
Creating lookup table: indicator
Creating lookup table: geo_data
Tables created.
Skipping the first row...
Inserting table data...
Inserting table data completed
Transform and Load Test Successful - - -### One Record Reading Test ### -Executing query...
-```sql -select * from air_quality where air_quality_id = 740885 -``` - -Asserting that row[0][data_value] == 16.4
Assert Successful
One Record Reading Test Successful - - -### All Records Reading Test ### -Executing query...
-```sql -select * from air_quality -``` - -Asserting that len(rows) == 18016
All Records Reading Test Successful - - -### Record Saving Test ### -Asserting there's no record in geo_data with ID 100000
Executing query...
-```sql -select * from geo_data where geo_id = 100000 -``` - -Assert Successful
Saving new record with ID 100000
Executing query...
-```sql -SELECT name FROM pragma_table_info('geo_data') -``` - -Executing query...
-```sql -INSERT INTO geo_data (geo_id, geo_place_name, geo_type_name) VALUES ('100000', 'Lancaster', 'UFO') -``` - -Asserting there's now a record in geo_data with ID 100000
Executing query...
-```sql -select * from geo_data where geo_id = 100000 -``` - -Assert Successful
Record Saving Test Successful - - -### Record Update Test ### -Asserting 'geo_place_name' in geo_data for row ID 100000 is 'Lancaster'
Executing query...
-```sql -select * from geo_data where geo_id = 100000 -``` - -Assert Successful
Updating 'geo_place_name' in geo_data for row ID 100000 is 'Duke'
Executing query...
-```sql -UPDATE geo_data SET geo_place_name='Duke' WHERE geo_id = 100000 -``` - -Asserting 'geo_place_name' in geo_data for row ID 100000 is now 'Duke'
Executing query...
-```sql -select * from geo_data where geo_id = 100000 -``` - -Assert Successful
Record Update Test Successful - - -### Record Deletion Test ### -Asserting there's a record in geo_data for row ID 100000
Executing query...
-```sql -select * from geo_data where geo_id = 100000 -``` - -Assert Successful
Deleting record with ID 100000
Executing query...
-```sql -delete from geo_data where geo_id = 100000 -``` - -Asserting there's no record in geo_data with ID 100000
Executing query...
-```sql -select * from geo_data where geo_id = 100000 -``` - -Assert Successful
Record Deletion Test Successful - - -### Reading All Column Test ### -Executing query...
-```sql -SELECT name FROM pragma_table_info('air_quality') -``` - -Asserting the air_quality table has six (6) columns
Assert Successful
Reading All Column Test Successful - - -Executing query...
-```sql -select * from geo_data -``` - -Executing query...
-```sql -select * from indicator -``` - diff --git a/main.py b/main.py deleted file mode 100644 index 0e6f81f..0000000 --- a/main.py +++ /dev/null @@ -1,145 +0,0 @@ -import sys -import argparse -from my_lib.extract import extract -from my_lib.load import transform_n_load -from my_lib.crud import ( - read_data, - read_all_data, - save_data, - delete_data, - update_data, - get_table_columns, -) -import ast - - -def handle_arguments(args): - """add action based on inital calls""" - parser = argparse.ArgumentParser(description="DE ETL And Query Script") - parser.add_argument( - "Functions", - choices=[ - "extract", - "transform_n_load", - "read_data", - "read_all_data", - "save_data", - "delete_data", - "update_data", - ], - ) - - args = parser.parse_args(args[:1]) - print(args.Functions) - if args.Functions == "extract": - parser.add_argument("url") - parser.add_argument("file_name") - - elif args.Functions == "transform_n_load": - parser.add_argument("local_dataset") - parser.add_argument("database_name") - parser.add_argument("new_data_tables") - parser.add_argument("new_lookup_tables") - parser.add_argument("column_attributes") - parser.add_argument("column_map") - - elif args.Functions == "read_data": - parser.add_argument("database_name") - parser.add_argument("table_name") - parser.add_argument("data_id") - - elif args.Functions == "read_all_data": - parser.add_argument("database_name") - parser.add_argument("table_name") - - elif args.Functions == "save_data": - parser.add_argument("database_name") - parser.add_argument("table_name") - parser.add_argument("row") - - elif args.Functions == "update_data": - parser.add_argument("database_name") - parser.add_argument("table_name") - parser.add_argument("data_id") - parser.add_argument("things_to_update") - - elif args.Functions == "delete_data": - parser.add_argument("database_name") - parser.add_argument("table_name") - parser.add_argument("data_id") - - elif args.Functions == "get_table_columns": - parser.add_argument("database_name") - parser.add_argument("table_name") - - # parse again - return parser.parse_args(sys.argv[1:]) - - -def main(): - """handles all the cli commands""" - - args = handle_arguments(sys.argv[1:]) - - if args.Functions == "extract": - print("Extracting data...") - print(extract(args.url, args.file_name)) - - elif args.Functions == "transform_n_load": - print("Transforming and loading data...") - print( - transform_n_load( - args.local_dataset, - args.database_name, - ast.literal_eval(args.new_data_tables), - ast.literal_eval(args.new_lookup_tables), - ast.literal_eval(args.column_attributes), - ast.literal_eval(args.column_map), - ) - ) - - elif args.Functions == "read_data": - print( - read_data( - args.database_name, args.table_name, ast.literal_eval(args.data_id) - ) - ) - - elif args.Functions == "read_all_data": - print(read_all_data(args.database_name, args.table_name)) - - elif args.Functions == "save_data": - print( - save_data( - args.database_name, - args.table_name, - ast.literal_eval(args.row), - ) - ) - - elif args.action == "update_data": - print( - update_data( - args.database_name, - args.table_name, - ast.literal_eval(args.data_id), - ast.literal_eval(args.things_to_update), - ) - ) - - elif args.Functions == "delete_data": - print( - delete_data( - args.database_name, args.table_name, ast.literal_eval(args.data_id) - ) - ) - - elif args.Functions == "get_table_columns": - print(get_table_columns(args.database_name, args.table_name)) - - else: - print(f"Unknown function: {args.action}") - - -if __name__ == "__main__": - main() diff --git a/pyspark.py b/pyspark.py deleted file mode 100644 index 390cb2a..0000000 --- a/pyspark.py +++ /dev/null @@ -1,35 +0,0 @@ -# from pyspark.sql import SparkSession -# from pyspark.sql.functions import avg - -# # Initialize SparkSession -# spark = SparkSession.builder \ -# .appName("AverageIndicators") \ -# .getOrCreate() - -# # Sample data -# data = [ -# (101, 'Area1', '2021-Q1', 50.0), -# (101, 'Area1', '2021-Q1', 70.0), -# (101, 'Area2', '2021-Q1', 80.0), -# (102, 'Area1', '2021-Q1', 60.0), -# (102, 'Area2', '2021-Q2', 90.0), -# (103, 'Area3', '2021-Q1', 40.0), -# (103, 'Area3', '2021-Q2', 50.0), -# (103, 'Area3', '2021-Q2', 70.0), -# ] - -# columns = ["fn_indicator_id", "fn_geo_id", "time_period", "data_value"] - -# # Create DataFrame -# df = spark.createDataFrame(data, columns) - -# # Calculate the average -# avg_df = df.groupBy("fn_indicator_id", "fn_geo_id", "time_period") \ -# .agg(avg("data_value").alias("average_data_value")) - -# # Order the results -# avg_df_ordered = avg_df.orderBy("fn_indicator_id", "fn_geo_id", "time_period") -# avg_df_ordered.show() - -# # Stop SparkSession -# spark.stop() \ No newline at end of file diff --git a/test_main.py b/test_main.py deleted file mode 100644 index b168a36..0000000 --- a/test_main.py +++ /dev/null @@ -1,231 +0,0 @@ -from my_lib.extract import extract -from my_lib.load import transform_n_load -from my_lib.util import log_tests -import os -import pyspark - - - -from my_lib.crud import ( - read_data, - read_all_data, - save_data, - delete_data, - update_data, - get_table_columns, -) - -column_map = { - "air_quality_id": 0, - "indicator_id": 1, - "indicator_name": 2, - "measure": 3, - "measure_info": 4, - "geo_type_name": 5, - "geo_id": 6, - "geo_place_name": 7, - "time_period": 8, - "start_date": 9, - "data_value": 10, - "fn_geo_id": 6, - "fn_indicator_id": 1, -} - - -# Test extract -def test_extract(): - log_tests("Extraction Test", header=True, new_log_file=True) - log_tests("Removing existing CSV file exists") - if os.path.exists("data/air_quality.csv"): - os.remove("data/air_quality.csv") - - log_tests("Confirming that CSV file doesn't exists...") - assert not os.path.exists("population_bar.png") - log_tests("Test Successful") - - log_tests("Extracting data and saving...") - extract( - "https://data.cityofnewyork.us/resource/c3uy-2p5r.csv?$limit=200000", - "air_quality.csv", - ) - - log_tests("Testing if CSV file exists...") - assert os.path.exists("data/air_quality.csv") - log_tests("Extraction Test Successful", last_in_group=True) - print("Extraction Test Successful") - - -# Test transform and load -def test_transform_and_load(): - log_tests("Transform and Load Test", header=True) - transform_n_load( - local_dataset="air_quality.csv", - database_name="air_quality.db", - new_data_tables={ - "air_quality": [ - "air_quality_id", - "fn_indicator_id", - "fn_geo_id", - "time_period", - "start_date", - "data_value", - ], - }, - new_lookup_tables={ - "indicator": ["indicator_id", "indicator_name", "measure", "measure_info"], - "geo_data": ["geo_id", "geo_place_name", "geo_type_name"], - }, - column_attributes={ - "air_quality_id": "INTEGER PRIMARY KEY", - "indicator_id": "INTEGER PRIMARY KEY", - "indicator_name": "TEXT", - "measure": "TEXT", - "measure_info": "TEXT", - "geo_type_name": "TEXT", - "geo_id": "INTEGER PRIMARY KEY", - "geo_place_name": "TEXT", - "time_period": "TEXT", - "start_date": "TEXT", - "data_value": "REAL", - "fn_indicator_id": "INTEGER", - "fn_geo_id": "INTEGER", - }, - column_map={ - "air_quality_id": 0, - "indicator_id": 1, - "indicator_name": 2, - "measure": 3, - "measure_info": 4, - "geo_type_name": 5, - "geo_id": 6, - "geo_place_name": 7, - "time_period": 8, - "start_date": 9, - "data_value": 10, - "fn_geo_id": 6, - "fn_indicator_id": 1, - }, - ) - log_tests("Transform and Load Test Successful", last_in_group=True) - print("Transform and Load Test Successful") - - -# Test read data -def test_read_data(): - log_tests("One Record Reading Test", header=True) - row = read_data("air_quality.db", "air_quality", 740885) - data_value = 5 - log_tests("Asserting that row[0][data_value] == 16.4") - assert row[0][data_value] == 16.4 - log_tests("Assert Successful") - log_tests("One Record Reading Test Successful", last_in_group=True) - print("One Record Reading Test Successful") - - -# Test read all data -def test_read_all_data(): - log_tests("All Records Reading Test", header=True) - rows = read_all_data("air_quality.db", "air_quality") - log_tests("Asserting that len(rows) == 18016") - assert len(rows) == 18016 - log_tests("All Records Reading Test Successful", last_in_group=True) - print("All Records Reading Test Successful") - - -# Test save data -def test_save_data(): - log_tests("Record Saving Test", header=True) - - log_tests("Asserting there's no record in geo_data with ID 100000") - result = read_data("air_quality.db", "geo_data", 100000) - assert result is None - log_tests("Assert Successful") - - log_tests("Saving new record with ID 100000") - save_data("air_quality.db", "geo_data", ["100000", "Lancaster", "UFO"]) - - log_tests("Asserting there's now a record in geo_data with ID 100000") - result = read_data("air_quality.db", "geo_data", 100000) - assert len(result) == 1 - log_tests("Assert Successful") - - log_tests("Record Saving Test Successful", last_in_group=True) - print("Record Saving Test Successful") - - -# Test update data -def test_update_data(): - log_tests("Record Update Test", header=True) - - log_tests("Asserting 'geo_place_name' in geo_data for row ID 100000 is 'Lancaster'") - result = read_data("air_quality.db", "geo_data", 100000) - assert result[0][1] == "Lancaster" - log_tests("Assert Successful") - - log_tests("Updating 'geo_place_name' in geo_data for row ID 100000 is 'Duke'") - update_data("air_quality.db", "geo_data", {"geo_place_name": "Duke"}, 100000) - - log_tests("Asserting 'geo_place_name' in geo_data for row ID 100000 is now 'Duke'") - result = read_data("air_quality.db", "geo_data", 100000) - assert result[0][1] == "Duke" - log_tests("Assert Successful") - - log_tests("Record Update Test Successful", last_in_group=True) - print("Record Update Test Successful") - - -# Test delete data -def test_delete_data(): - log_tests("Record Deletion Test", header=True) - - log_tests("Asserting there's a record in geo_data for row ID 100000") - result = read_data("air_quality.db", "geo_data", 100000) - assert len(result) == 1 - log_tests("Assert Successful") - - log_tests("Deleting record with ID 100000") - print(delete_data("air_quality.db", "geo_data", 100000)) - - log_tests("Asserting there's no record in geo_data with ID 100000") - result = read_data("air_quality.db", "geo_data", 100000) - assert result is None - log_tests("Assert Successful") - - log_tests("Record Deletion Test Successful", last_in_group=True) - print("Record Deletion Test Successful") - - -# Test read all column names -def test_get_table_columns(): - log_tests("Reading All Column Test", header=True) - - columns = get_table_columns("air_quality.db", "air_quality") - - log_tests("Asserting the air_quality table has six (6) columns") - assert len(columns) == 6 - log_tests("Assert Successful") - - log_tests("Reading All Column Test Successful", last_in_group=True) - print("Reading All Column Test Successful") - - -# Two addtional queries to meet requirements -def execute_two_addtional_queries(): - print("****************Data in Geo_Data*************************") - rows = read_all_data("air_quality.db", "geo_data") - print("The number of rows retrieved is: ", len(rows)) - print("****************Data in Indicator*************************") - print(read_all_data("air_quality.db", "indicator")) - - -if __name__ == "__main__": - # test_extract() - # test_transform_and_load() - # test_read_data() - # test_read_all_data() - # test_save_data() - # test_update_data() - # test_delete_data() - # test_get_table_columns() - # execute_two_addtional_queries() - print(pyspark.__version__) \ No newline at end of file