From 961ecdf7507d7f44d86876b0dccafb65ea64ffdd Mon Sep 17 00:00:00 2001 From: Federico Zambelli Date: Mon, 23 Sep 2024 10:25:43 +0200 Subject: [PATCH 1/5] chore: Include .envrc to repo and a bunch of goodies - Added task runner Taskfile.dev and basic Taskfile - Bumped devenv.lock for devenv v1.1 - More informative pytest output --- .envrc | 3 +++ .gitignore | 4 +++- Taskfile.yml | 32 ++++++++++++++++++++++++++++++++ devenv.lock | 32 ++++++++++---------------------- devenv.nix | 1 + pyproject.toml | 1 + 6 files changed, 50 insertions(+), 23 deletions(-) create mode 100644 .envrc create mode 100644 Taskfile.yml diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..247a291 --- /dev/null +++ b/.envrc @@ -0,0 +1,3 @@ +source_url "https://raw.githubusercontent.com/cachix/devenv/95f329d49a8a5289d31e0982652f7058a189bfca/direnvrc" "sha256-d+8cBpDfDBj41inrADaJt+bDWhOktwslgoP5YiGJ1v0=" + +use devenv diff --git a/.gitignore b/.gitignore index ae5ac65..3b574b4 100644 --- a/.gitignore +++ b/.gitignore @@ -179,4 +179,6 @@ devenv.local.nix ### direnv ### .direnv -.envrc + +### vscode ### +.vscode diff --git a/Taskfile.yml b/Taskfile.yml new file mode 100644 index 0000000..19ef6fb --- /dev/null +++ b/Taskfile.yml @@ -0,0 +1,32 @@ +version: '3' + +tasks: + default: + internal: false + silent: true + # yamllint disable rule:line-length + cmds: + - 'echo -e "\nšŸ‘‹ Welcome to Task! To see available commands, run: task -l"' + - 'echo -e "šŸ“ To see instructions for a specific command run: task command-name --summary\n"' + - task -l --sort none + # yamllint enable rule:line-length + + up: + desc: Start processes defined in `devenv.nix` in the background. + internal: false + silent: true + cmd: devenv up -d + + down: + desc: Stop processes defined in `devenv.nix` in the background. + internal: false + silent: true + cmd: devenv processes down + + test: + desc: Run pytest + internal: false + silent: true + cmds: + - pytest || true + - rm -rf ./tmp diff --git a/devenv.lock b/devenv.lock index 04b19b9..bb087c9 100644 --- a/devenv.lock +++ b/devenv.lock @@ -3,11 +3,10 @@ "devenv": { "locked": { "dir": "src/modules", - "lastModified": 1726050924, + "lastModified": 1726826452, "owner": "cachix", "repo": "devenv", - "rev": "cf471f691c3765ed431199f61b8bd70530bc4305", - "treeHash": "04a5d566820f8fb1955c7c49ccbd3ff95d9129d7", + "rev": "2bdf6461e88c7e93b94d72d8b11d5a61f167cbf5", "type": "github" }, "original": { @@ -24,7 +23,6 @@ "owner": "edolstra", "repo": "flake-compat", "rev": "0f9255e01c2351cc7d116c072cb317785dd33b33", - "treeHash": "2addb7b71a20a25ea74feeaf5c2f6a6b30898ecb", "type": "github" }, "original": { @@ -40,7 +38,6 @@ "owner": "edolstra", "repo": "flake-compat", "rev": "0f9255e01c2351cc7d116c072cb317785dd33b33", - "treeHash": "2addb7b71a20a25ea74feeaf5c2f6a6b30898ecb", "type": "github" }, "original": { @@ -54,11 +51,10 @@ "systems": "systems" }, "locked": { - "lastModified": 1710146030, + "lastModified": 1726560853, "owner": "numtide", "repo": "flake-utils", - "rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a", - "treeHash": "bd263f021e345cb4a39d80c126ab650bebc3c10c", + "rev": "c1dfcf08411b08f6b8615f7d8971a2bfa81d5e8a", "type": "github" }, "original": { @@ -79,7 +75,6 @@ "owner": "hercules-ci", "repo": "gitignore.nix", "rev": "637db329424fd7e46cf4185293b9cc8c88c95394", - "treeHash": "ca14199cabdfe1a06a7b1654c76ed49100a689f9", "type": "github" }, "original": { @@ -94,7 +89,6 @@ "owner": "rrbutani", "repo": "nix-mk-shell-bin", "rev": "ff5d8bd4d68a347be5042e2f16caee391cd75887", - "treeHash": "496327dabdc787353a29987f492dd4939151baad", "type": "github" }, "original": { @@ -115,7 +109,6 @@ "owner": "nlewo", "repo": "nix2container", "rev": "fa6bb0a1159f55d071ba99331355955ae30b3401", - "treeHash": "a934d246fadcf8b36d28f3577fad413f5ab3f7d3", "type": "github" }, "original": { @@ -126,11 +119,10 @@ }, "nixpkgs": { "locked": { - "lastModified": 1726042813, + "lastModified": 1726871744, "owner": "NixOS", "repo": "nixpkgs", - "rev": "159be5db480d1df880a0135ca0bfed84c2f88353", - "treeHash": "1d972c98784fcd0d00eb59fa5a5966503cb5720f", + "rev": "a1d92660c6b3b7c26fb883500a80ea9d33321be2", "type": "github" }, "original": { @@ -152,7 +144,6 @@ "owner": "cachix", "repo": "nixpkgs-python", "rev": "7c550bca7e6cf95898e32eb2173efe7ebb447460", - "treeHash": "d9d38ef1b6fc92be18170b74e9889a7ab9174f6e", "type": "github" }, "original": { @@ -163,11 +154,10 @@ }, "nixpkgs-stable": { "locked": { - "lastModified": 1725826545, + "lastModified": 1726838390, "owner": "NixOS", "repo": "nixpkgs", - "rev": "f4c846aee8e1e29062aa8514d5e0ab270f4ec2f9", - "treeHash": "8fc49deaed3f2728a7147c38163cc468a117570a", + "rev": "944b2aea7f0a2d7c79f72468106bc5510cbf5101", "type": "github" }, "original": { @@ -187,11 +177,10 @@ "nixpkgs-stable": "nixpkgs-stable" }, "locked": { - "lastModified": 1725513492, + "lastModified": 1726745158, "owner": "cachix", "repo": "pre-commit-hooks.nix", - "rev": "7570de7b9b504cfe92025dd1be797bf546f66528", - "treeHash": "4b46d77870afecd8f642541cb4f4927326343b59", + "rev": "4e743a6920eab45e8ba0fbe49dc459f1423a4b74", "type": "github" }, "original": { @@ -216,7 +205,6 @@ "owner": "nix-systems", "repo": "default", "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", - "treeHash": "cce81f2a0f0743b2eb61bc2eb6c7adbe2f2c6beb", "type": "github" }, "original": { diff --git a/devenv.nix b/devenv.nix index 3988232..09ad44c 100644 --- a/devenv.nix +++ b/devenv.nix @@ -51,6 +51,7 @@ in tealdeer docker docker-compose + go-task # Python Dependencies (python3.withPackages python-packages) diff --git a/pyproject.toml b/pyproject.toml index 19d0c05..7ae3021 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,6 +70,7 @@ source = [ skip_empty = true [tool.pytest.ini_options] +addopts = "-vv --tb=auto --disable-warnings" pythonpath = [ "src" ] From 89baf53990ad4d84a91fc92d0fbd9a559554e0ac Mon Sep 17 00:00:00 2001 From: Federico Zambelli Date: Mon, 23 Sep 2024 14:43:38 +0200 Subject: [PATCH 2/5] chore: remove taskfile in favor of devenv scripts --- Taskfile.yml | 32 -------------------------------- devenv.nix | 11 ++++++++++- 2 files changed, 10 insertions(+), 33 deletions(-) delete mode 100644 Taskfile.yml diff --git a/Taskfile.yml b/Taskfile.yml deleted file mode 100644 index 19ef6fb..0000000 --- a/Taskfile.yml +++ /dev/null @@ -1,32 +0,0 @@ -version: '3' - -tasks: - default: - internal: false - silent: true - # yamllint disable rule:line-length - cmds: - - 'echo -e "\nšŸ‘‹ Welcome to Task! To see available commands, run: task -l"' - - 'echo -e "šŸ“ To see instructions for a specific command run: task command-name --summary\n"' - - task -l --sort none - # yamllint enable rule:line-length - - up: - desc: Start processes defined in `devenv.nix` in the background. - internal: false - silent: true - cmd: devenv up -d - - down: - desc: Stop processes defined in `devenv.nix` in the background. - internal: false - silent: true - cmd: devenv processes down - - test: - desc: Run pytest - internal: false - silent: true - cmds: - - pytest || true - - rm -rf ./tmp diff --git a/devenv.nix b/devenv.nix index 09ad44c..f779ae6 100644 --- a/devenv.nix +++ b/devenv.nix @@ -43,6 +43,16 @@ in ''; }; + # convenient shortcuts + scripts.up.exec = "devenv up -d"; + scripts.up.description = "Start processes defined in devenv.nix in the background."; + + scripts.down.exec = "devenv processes down"; + scripts.down.description = "Stop processes defined in devenv.nix in the background."; + + scripts.tests.exec = "pytest && rm -rf ./tmp"; + scripts.tests.description = "Run pytest and cleanup."; + # https://devenv.sh/packages/ packages = with pkgs; [ nixfmt-rfc-style @@ -51,7 +61,6 @@ in tealdeer docker docker-compose - go-task # Python Dependencies (python3.withPackages python-packages) From 7b9cad9947fccb02555433a36c42a1d30d266027 Mon Sep 17 00:00:00 2001 From: Federico Zambelli Date: Mon, 23 Sep 2024 16:02:12 +0200 Subject: [PATCH 3/5] chore: add cmd to pretty print scripts and descriptions --- devenv.nix | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/devenv.nix b/devenv.nix index f779ae6..cfc8fbd 100644 --- a/devenv.nix +++ b/devenv.nix @@ -45,14 +45,34 @@ in # convenient shortcuts scripts.up.exec = "devenv up -d"; - scripts.up.description = "Start processes defined in devenv.nix in the background."; + scripts.up.description = "Start processes in the background."; scripts.down.exec = "devenv processes down"; - scripts.down.description = "Stop processes defined in devenv.nix in the background."; + scripts.down.description = "Stop processes."; scripts.tests.exec = "pytest && rm -rf ./tmp"; scripts.tests.description = "Run pytest and cleanup."; + scripts.show.exec = '' + GREEN="\033[0;32m"; + YELLOW="\033[33m"; + NC="\033[0m"; + echo + echo -e "āœØ Helper scripts you can run to make your development richer:" + echo + + ${pkgs.gnused}/bin/sed -e 's| |ā€¢ā€¢|g' -e 's|=| |' < Date: Mon, 23 Sep 2024 16:29:01 +0200 Subject: [PATCH 4/5] replace local ./tmp with system /tmp Also: add more informative schema-registry health check msg. --- .test.sh | 4 +++ devenv.nix | 4 +-- src/sparkle/application/spark.py | 10 +++--- tests/unit/reader/test_table_reader.py | 14 ++++---- tests/unit/writer/test_iceberg_writer.py | 41 +++++++----------------- tests/unit/writer/test_kafka_writer.py | 15 +++++---- 6 files changed, 35 insertions(+), 53 deletions(-) diff --git a/.test.sh b/.test.sh index 2cc4cb6..9a51468 100755 --- a/.test.sh +++ b/.test.sh @@ -12,6 +12,9 @@ check_health() { return 0 else echo "Container 'schema-registry' is not healthy. Current status: $STATUS" + echo "Did you remember to start the processes?" + echo "You can do so by running \`up\` in the terminal." + echo return 1 fi } @@ -22,6 +25,7 @@ while true; do break else echo "Retrying in 5 seconds..." + echo sleep 5 fi done diff --git a/devenv.nix b/devenv.nix index cfc8fbd..ba9f7e0 100644 --- a/devenv.nix +++ b/devenv.nix @@ -50,8 +50,8 @@ in scripts.down.exec = "devenv processes down"; scripts.down.description = "Stop processes."; - scripts.tests.exec = "pytest && rm -rf ./tmp"; - scripts.tests.description = "Run pytest and cleanup."; + scripts.tests.exec = "./.test.sh"; + scripts.tests.description = "Run tests."; scripts.show.exec = '' GREEN="\033[0;32m"; diff --git a/src/sparkle/application/spark.py b/src/sparkle/application/spark.py index e7cf273..d7bb620 100644 --- a/src/sparkle/application/spark.py +++ b/src/sparkle/application/spark.py @@ -1,7 +1,9 @@ import os + from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.sql import SparkSession + from sparkle.utils.logger import logger try: @@ -40,7 +42,7 @@ def get_local_session(): "spark.sql.catalog.spark_catalog.type": "hive", "spark.sql.catalog.local": "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.local.type": "hadoop", - "spark.sql.catalog.local.warehouse": "./tmp/warehouse", + "spark.sql.catalog.local.warehouse": "/tmp/warehouse", "spark.sql.defaultCatalog": "local", } @@ -49,11 +51,7 @@ def get_local_session(): for key, value in LOCAL_CONFIG.items(): spark_conf.set(key, str(value)) - spark_session = ( - SparkSession.builder.master("local[*]") - .appName("LocalSparkleApp") - .config(conf=spark_conf) - ) + spark_session = SparkSession.builder.master("local[*]").appName("LocalSparkleApp").config(conf=spark_conf) if ivy_settings_path: spark_session.config("spark.jars.ivySettings", ivy_settings_path) diff --git a/tests/unit/reader/test_table_reader.py b/tests/unit/reader/test_table_reader.py index 4d63d62..6ec63f1 100644 --- a/tests/unit/reader/test_table_reader.py +++ b/tests/unit/reader/test_table_reader.py @@ -1,13 +1,15 @@ import os + import pytest from pyspark.sql import DataFrame + +from sparkle.config import Config, TableConfig from sparkle.reader.table_reader import TableReader -from sparkle.config import TableConfig, Config TEST_DB = "test_db" TEST_TABLE = "test_table" CATALOG = "glue_catalog" -WAREHOUSE = "./tmp/warehouse" +WAREHOUSE = "/tmp/warehouse" @pytest.fixture @@ -104,12 +106,8 @@ def test_read_table(spark_session, test_db_path): test_db_path (str): Path to the test database. """ # Create a sample table for testing - spark_session.sql( - f"CREATE TABLE {CATALOG}.{TEST_DB}.{TEST_TABLE} (id INT, name STRING)" - ) - spark_session.sql( - f"INSERT INTO {CATALOG}.{TEST_DB}.{TEST_TABLE} VALUES (1, 'Alice'), (2, 'Bob')" - ) + spark_session.sql(f"CREATE TABLE {CATALOG}.{TEST_DB}.{TEST_TABLE} (id INT, name STRING)") + spark_session.sql(f"INSERT INTO {CATALOG}.{TEST_DB}.{TEST_TABLE} VALUES (1, 'Alice'), (2, 'Bob')") reader = TableReader( spark=spark_session, diff --git a/tests/unit/writer/test_iceberg_writer.py b/tests/unit/writer/test_iceberg_writer.py index 1b7f61b..c6fc097 100644 --- a/tests/unit/writer/test_iceberg_writer.py +++ b/tests/unit/writer/test_iceberg_writer.py @@ -1,15 +1,16 @@ import datetime import os + import pytest from pyspark.sql.functions import days from sparkle.application import PROCESS_TIME_COLUMN -from sparkle.writer.iceberg_writer import IcebergWriter from sparkle.utils.spark import table_exists +from sparkle.writer.iceberg_writer import IcebergWriter TEST_DB = "default" TEST_TABLE = "test_table" -WAREHOUSE = "./tmp/warehouse" +WAREHOUSE = "/tmp/warehouse" CATALOG = "glue_catalog" @@ -51,16 +52,12 @@ def partition_df(spark_session): { "user_id": 1, "name": "Bob", - PROCESS_TIME_COLUMN: datetime.datetime.fromisoformat("2023-11-03").replace( - tzinfo=datetime.timezone.utc - ), + PROCESS_TIME_COLUMN: datetime.datetime.fromisoformat("2023-11-03").replace(tzinfo=datetime.timezone.utc), }, { "user_id": 2, "name": "Alice", - PROCESS_TIME_COLUMN: datetime.datetime.fromisoformat("2023-11-02").replace( - tzinfo=datetime.timezone.utc - ), + PROCESS_TIME_COLUMN: datetime.datetime.fromisoformat("2023-11-02").replace(tzinfo=datetime.timezone.utc), }, ] return spark_session.createDataFrame(data) @@ -84,9 +81,7 @@ def partition_df_evolved_schema(spark_session): "user_id": 1, "name": "Bob", "new_field": "new_field_value", - PROCESS_TIME_COLUMN: datetime.datetime.fromisoformat("2023-11-03").replace( - tzinfo=datetime.timezone.utc - ), + PROCESS_TIME_COLUMN: datetime.datetime.fromisoformat("2023-11-03").replace(tzinfo=datetime.timezone.utc), } ] return spark_session.createDataFrame(data) @@ -112,9 +107,7 @@ def test_writer_should_write_iceberg(user_dataframe, test_db_path, spark_session writer.write(user_dataframe) - assert table_exists( - database_name=TEST_DB, table_name=TEST_TABLE, spark=spark_session - ) + assert table_exists(database_name=TEST_DB, table_name=TEST_TABLE, spark=spark_session) def test_write_with_partitions(test_db_path, partition_df, spark_session): @@ -138,21 +131,11 @@ def test_write_with_partitions(test_db_path, partition_df, spark_session): writer.write(partition_df) - assert os.path.exists( - os.path.join( - test_db_path, TEST_TABLE, "data", f"{PROCESS_TIME_COLUMN}_day=2023-11-02" - ) - ) - assert os.path.exists( - os.path.join( - test_db_path, TEST_TABLE, "data", f"{PROCESS_TIME_COLUMN}_day=2023-11-03" - ) - ) + assert os.path.exists(os.path.join(test_db_path, TEST_TABLE, "data", f"{PROCESS_TIME_COLUMN}_day=2023-11-02")) + assert os.path.exists(os.path.join(test_db_path, TEST_TABLE, "data", f"{PROCESS_TIME_COLUMN}_day=2023-11-03")) -def test_write_with_partitions_no_partition_column_provided( - test_db_path, partition_df, spark_session -): +def test_write_with_partitions_no_partition_column_provided(test_db_path, partition_df, spark_session): """Test writing data to Iceberg without specifying partitions. This test verifies that data is written to the Iceberg table without any partitions @@ -176,9 +159,7 @@ def test_write_with_partitions_no_partition_column_provided( assert os.path.exists(os.path.join(test_db_path, TEST_TABLE, "data")) -def test_write_with_schema_evolution( - test_db_path, partition_df, partition_df_evolved_schema, spark_session -): +def test_write_with_schema_evolution(test_db_path, partition_df, partition_df_evolved_schema, spark_session): """Test writing data to Iceberg with schema evolution. This test checks that the Iceberg table correctly handles schema evolution by diff --git a/tests/unit/writer/test_kafka_writer.py b/tests/unit/writer/test_kafka_writer.py index 717de02..cb8290d 100644 --- a/tests/unit/writer/test_kafka_writer.py +++ b/tests/unit/writer/test_kafka_writer.py @@ -1,12 +1,13 @@ -import pytest -from typing import Any import os import shutil -from sparkle.writer.kafka_writer import KafkaStreamPublisher -from pyspark.sql.functions import floor, rand -from pyspark.sql import DataFrame -from pyspark.sql import SparkSession import time +from typing import Any + +import pytest +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql.functions import floor, rand + +from sparkle.writer.kafka_writer import KafkaStreamPublisher @pytest.fixture @@ -23,7 +24,7 @@ def kafka_config() -> dict[str, Any]: "kafka.bootstrap.servers": "localhost:9092", "kafka.security.protocol": "PLAINTEXT", }, - "checkpoint_location": "./tmp/checkpoint", + "checkpoint_location": "/tmp/checkpoint", "kafka_topic": "test-kafka-writer-topic", "output_mode": "append", "unique_identifier_column_name": "id", From 46d5d7a1fb45c7b8d11a929586a93010e2fa0e41 Mon Sep 17 00:00:00 2001 From: Federico Zambelli Date: Mon, 23 Sep 2024 16:41:39 +0200 Subject: [PATCH 5/5] Remove test custom script, unnecessary --- .test.sh | 4 ---- devenv.nix | 3 --- 2 files changed, 7 deletions(-) diff --git a/.test.sh b/.test.sh index 9a51468..2cc4cb6 100755 --- a/.test.sh +++ b/.test.sh @@ -12,9 +12,6 @@ check_health() { return 0 else echo "Container 'schema-registry' is not healthy. Current status: $STATUS" - echo "Did you remember to start the processes?" - echo "You can do so by running \`up\` in the terminal." - echo return 1 fi } @@ -25,7 +22,6 @@ while true; do break else echo "Retrying in 5 seconds..." - echo sleep 5 fi done diff --git a/devenv.nix b/devenv.nix index ba9f7e0..b2a3c24 100644 --- a/devenv.nix +++ b/devenv.nix @@ -50,9 +50,6 @@ in scripts.down.exec = "devenv processes down"; scripts.down.description = "Stop processes."; - scripts.tests.exec = "./.test.sh"; - scripts.tests.description = "Run tests."; - scripts.show.exec = '' GREEN="\033[0;32m"; YELLOW="\033[33m";