Skip to content

Commit

Permalink
Merge pull request #18 from DataChefHQ/feature/minor-changes
Browse files Browse the repository at this point in the history
chore: Include .envrc to repo and a bunch of goodies
  • Loading branch information
wtfzambo authored Sep 23, 2024
2 parents 2cb5e1c + 46d5d7a commit 54dfbbc
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 74 deletions.
3 changes: 3 additions & 0 deletions .envrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
source_url "https://raw.githubusercontent.com/cachix/devenv/95f329d49a8a5289d31e0982652f7058a189bfca/direnvrc" "sha256-d+8cBpDfDBj41inrADaJt+bDWhOktwslgoP5YiGJ1v0="

use devenv
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,6 @@ devenv.local.nix

### direnv ###
.direnv
.envrc

### vscode ###
.vscode
32 changes: 10 additions & 22 deletions devenv.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
"devenv": {
"locked": {
"dir": "src/modules",
"lastModified": 1726050924,
"lastModified": 1726826452,
"owner": "cachix",
"repo": "devenv",
"rev": "cf471f691c3765ed431199f61b8bd70530bc4305",
"treeHash": "04a5d566820f8fb1955c7c49ccbd3ff95d9129d7",
"rev": "2bdf6461e88c7e93b94d72d8b11d5a61f167cbf5",
"type": "github"
},
"original": {
Expand All @@ -24,7 +23,6 @@
"owner": "edolstra",
"repo": "flake-compat",
"rev": "0f9255e01c2351cc7d116c072cb317785dd33b33",
"treeHash": "2addb7b71a20a25ea74feeaf5c2f6a6b30898ecb",
"type": "github"
},
"original": {
Expand All @@ -40,7 +38,6 @@
"owner": "edolstra",
"repo": "flake-compat",
"rev": "0f9255e01c2351cc7d116c072cb317785dd33b33",
"treeHash": "2addb7b71a20a25ea74feeaf5c2f6a6b30898ecb",
"type": "github"
},
"original": {
Expand All @@ -54,11 +51,10 @@
"systems": "systems"
},
"locked": {
"lastModified": 1710146030,
"lastModified": 1726560853,
"owner": "numtide",
"repo": "flake-utils",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"treeHash": "bd263f021e345cb4a39d80c126ab650bebc3c10c",
"rev": "c1dfcf08411b08f6b8615f7d8971a2bfa81d5e8a",
"type": "github"
},
"original": {
Expand All @@ -79,7 +75,6 @@
"owner": "hercules-ci",
"repo": "gitignore.nix",
"rev": "637db329424fd7e46cf4185293b9cc8c88c95394",
"treeHash": "ca14199cabdfe1a06a7b1654c76ed49100a689f9",
"type": "github"
},
"original": {
Expand All @@ -94,7 +89,6 @@
"owner": "rrbutani",
"repo": "nix-mk-shell-bin",
"rev": "ff5d8bd4d68a347be5042e2f16caee391cd75887",
"treeHash": "496327dabdc787353a29987f492dd4939151baad",
"type": "github"
},
"original": {
Expand All @@ -115,7 +109,6 @@
"owner": "nlewo",
"repo": "nix2container",
"rev": "fa6bb0a1159f55d071ba99331355955ae30b3401",
"treeHash": "a934d246fadcf8b36d28f3577fad413f5ab3f7d3",
"type": "github"
},
"original": {
Expand All @@ -126,11 +119,10 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1726042813,
"lastModified": 1726871744,
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "159be5db480d1df880a0135ca0bfed84c2f88353",
"treeHash": "1d972c98784fcd0d00eb59fa5a5966503cb5720f",
"rev": "a1d92660c6b3b7c26fb883500a80ea9d33321be2",
"type": "github"
},
"original": {
Expand All @@ -152,7 +144,6 @@
"owner": "cachix",
"repo": "nixpkgs-python",
"rev": "7c550bca7e6cf95898e32eb2173efe7ebb447460",
"treeHash": "d9d38ef1b6fc92be18170b74e9889a7ab9174f6e",
"type": "github"
},
"original": {
Expand All @@ -163,11 +154,10 @@
},
"nixpkgs-stable": {
"locked": {
"lastModified": 1725826545,
"lastModified": 1726838390,
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "f4c846aee8e1e29062aa8514d5e0ab270f4ec2f9",
"treeHash": "8fc49deaed3f2728a7147c38163cc468a117570a",
"rev": "944b2aea7f0a2d7c79f72468106bc5510cbf5101",
"type": "github"
},
"original": {
Expand All @@ -187,11 +177,10 @@
"nixpkgs-stable": "nixpkgs-stable"
},
"locked": {
"lastModified": 1725513492,
"lastModified": 1726745158,
"owner": "cachix",
"repo": "pre-commit-hooks.nix",
"rev": "7570de7b9b504cfe92025dd1be797bf546f66528",
"treeHash": "4b46d77870afecd8f642541cb4f4927326343b59",
"rev": "4e743a6920eab45e8ba0fbe49dc459f1423a4b74",
"type": "github"
},
"original": {
Expand All @@ -216,7 +205,6 @@
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"treeHash": "cce81f2a0f0743b2eb61bc2eb6c7adbe2f2c6beb",
"type": "github"
},
"original": {
Expand Down
28 changes: 28 additions & 0 deletions devenv.nix
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,33 @@ in
'';
};

# convenient shortcuts
scripts.up.exec = "devenv up -d";
scripts.up.description = "Start processes in the background.";

scripts.down.exec = "devenv processes down";
scripts.down.description = "Stop processes.";

scripts.show.exec = ''
GREEN="\033[0;32m";
YELLOW="\033[33m";
NC="\033[0m";
echo
echo -e "✨ Helper scripts you can run to make your development richer:"
echo
${pkgs.gnused}/bin/sed -e 's| |••|g' -e 's|=| |' <<EOF | ${pkgs.util-linuxMinimal}/bin/column -t | ${pkgs.gnused}/bin/sed -e "s|^\([^ ]*\)|$(printf "$GREEN")\1$(printf "$NC"): |" -e "s|^|$(printf "$YELLOW*$NC") |" -e 's|••| |g'
${lib.generators.toKeyValue { } (
lib.mapAttrs (name: value: value.description) (
lib.filterAttrs (_: value: value.description != "") config.scripts
)
)}
EOF
echo
'';
scripts.show.description = "Print this message and exit.";

# https://devenv.sh/packages/
packages = with pkgs; [
nixfmt-rfc-style
Expand Down Expand Up @@ -82,6 +109,7 @@ in

enterShell = ''
hello
show
'';

# https://devenv.sh/pre-commit-hooks/
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ source = [
skip_empty = true

[tool.pytest.ini_options]
addopts = "-vv --tb=auto --disable-warnings"
pythonpath = [
"src"
]
10 changes: 4 additions & 6 deletions src/sparkle/application/spark.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import os

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

from sparkle.utils.logger import logger

try:
Expand Down Expand Up @@ -40,7 +42,7 @@ def get_local_session():
"spark.sql.catalog.spark_catalog.type": "hive",
"spark.sql.catalog.local": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.local.type": "hadoop",
"spark.sql.catalog.local.warehouse": "./tmp/warehouse",
"spark.sql.catalog.local.warehouse": "/tmp/warehouse",
"spark.sql.defaultCatalog": "local",
}

Expand All @@ -49,11 +51,7 @@ def get_local_session():
for key, value in LOCAL_CONFIG.items():
spark_conf.set(key, str(value))

spark_session = (
SparkSession.builder.master("local[*]")
.appName("LocalSparkleApp")
.config(conf=spark_conf)
)
spark_session = SparkSession.builder.master("local[*]").appName("LocalSparkleApp").config(conf=spark_conf)

if ivy_settings_path:
spark_session.config("spark.jars.ivySettings", ivy_settings_path)
Expand Down
14 changes: 6 additions & 8 deletions tests/unit/reader/test_table_reader.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import os

import pytest
from pyspark.sql import DataFrame

from sparkle.config import Config, TableConfig
from sparkle.reader.table_reader import TableReader
from sparkle.config import TableConfig, Config

TEST_DB = "test_db"
TEST_TABLE = "test_table"
CATALOG = "glue_catalog"
WAREHOUSE = "./tmp/warehouse"
WAREHOUSE = "/tmp/warehouse"


@pytest.fixture
Expand Down Expand Up @@ -104,12 +106,8 @@ def test_read_table(spark_session, test_db_path):
test_db_path (str): Path to the test database.
"""
# Create a sample table for testing
spark_session.sql(
f"CREATE TABLE {CATALOG}.{TEST_DB}.{TEST_TABLE} (id INT, name STRING)"
)
spark_session.sql(
f"INSERT INTO {CATALOG}.{TEST_DB}.{TEST_TABLE} VALUES (1, 'Alice'), (2, 'Bob')"
)
spark_session.sql(f"CREATE TABLE {CATALOG}.{TEST_DB}.{TEST_TABLE} (id INT, name STRING)")
spark_session.sql(f"INSERT INTO {CATALOG}.{TEST_DB}.{TEST_TABLE} VALUES (1, 'Alice'), (2, 'Bob')")

reader = TableReader(
spark=spark_session,
Expand Down
41 changes: 11 additions & 30 deletions tests/unit/writer/test_iceberg_writer.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import datetime
import os

import pytest
from pyspark.sql.functions import days

from sparkle.application import PROCESS_TIME_COLUMN
from sparkle.writer.iceberg_writer import IcebergWriter
from sparkle.utils.spark import table_exists
from sparkle.writer.iceberg_writer import IcebergWriter

TEST_DB = "default"
TEST_TABLE = "test_table"
WAREHOUSE = "./tmp/warehouse"
WAREHOUSE = "/tmp/warehouse"
CATALOG = "glue_catalog"


Expand Down Expand Up @@ -51,16 +52,12 @@ def partition_df(spark_session):
{
"user_id": 1,
"name": "Bob",
PROCESS_TIME_COLUMN: datetime.datetime.fromisoformat("2023-11-03").replace(
tzinfo=datetime.timezone.utc
),
PROCESS_TIME_COLUMN: datetime.datetime.fromisoformat("2023-11-03").replace(tzinfo=datetime.timezone.utc),
},
{
"user_id": 2,
"name": "Alice",
PROCESS_TIME_COLUMN: datetime.datetime.fromisoformat("2023-11-02").replace(
tzinfo=datetime.timezone.utc
),
PROCESS_TIME_COLUMN: datetime.datetime.fromisoformat("2023-11-02").replace(tzinfo=datetime.timezone.utc),
},
]
return spark_session.createDataFrame(data)
Expand All @@ -84,9 +81,7 @@ def partition_df_evolved_schema(spark_session):
"user_id": 1,
"name": "Bob",
"new_field": "new_field_value",
PROCESS_TIME_COLUMN: datetime.datetime.fromisoformat("2023-11-03").replace(
tzinfo=datetime.timezone.utc
),
PROCESS_TIME_COLUMN: datetime.datetime.fromisoformat("2023-11-03").replace(tzinfo=datetime.timezone.utc),
}
]
return spark_session.createDataFrame(data)
Expand All @@ -112,9 +107,7 @@ def test_writer_should_write_iceberg(user_dataframe, test_db_path, spark_session

writer.write(user_dataframe)

assert table_exists(
database_name=TEST_DB, table_name=TEST_TABLE, spark=spark_session
)
assert table_exists(database_name=TEST_DB, table_name=TEST_TABLE, spark=spark_session)


def test_write_with_partitions(test_db_path, partition_df, spark_session):
Expand All @@ -138,21 +131,11 @@ def test_write_with_partitions(test_db_path, partition_df, spark_session):

writer.write(partition_df)

assert os.path.exists(
os.path.join(
test_db_path, TEST_TABLE, "data", f"{PROCESS_TIME_COLUMN}_day=2023-11-02"
)
)
assert os.path.exists(
os.path.join(
test_db_path, TEST_TABLE, "data", f"{PROCESS_TIME_COLUMN}_day=2023-11-03"
)
)
assert os.path.exists(os.path.join(test_db_path, TEST_TABLE, "data", f"{PROCESS_TIME_COLUMN}_day=2023-11-02"))
assert os.path.exists(os.path.join(test_db_path, TEST_TABLE, "data", f"{PROCESS_TIME_COLUMN}_day=2023-11-03"))


def test_write_with_partitions_no_partition_column_provided(
test_db_path, partition_df, spark_session
):
def test_write_with_partitions_no_partition_column_provided(test_db_path, partition_df, spark_session):
"""Test writing data to Iceberg without specifying partitions.
This test verifies that data is written to the Iceberg table without any partitions
Expand All @@ -176,9 +159,7 @@ def test_write_with_partitions_no_partition_column_provided(
assert os.path.exists(os.path.join(test_db_path, TEST_TABLE, "data"))


def test_write_with_schema_evolution(
test_db_path, partition_df, partition_df_evolved_schema, spark_session
):
def test_write_with_schema_evolution(test_db_path, partition_df, partition_df_evolved_schema, spark_session):
"""Test writing data to Iceberg with schema evolution.
This test checks that the Iceberg table correctly handles schema evolution by
Expand Down
15 changes: 8 additions & 7 deletions tests/unit/writer/test_kafka_writer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import pytest
from typing import Any
import os
import shutil
from sparkle.writer.kafka_writer import KafkaStreamPublisher
from pyspark.sql.functions import floor, rand
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
import time
from typing import Any

import pytest
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import floor, rand

from sparkle.writer.kafka_writer import KafkaStreamPublisher


@pytest.fixture
Expand All @@ -23,7 +24,7 @@ def kafka_config() -> dict[str, Any]:
"kafka.bootstrap.servers": "localhost:9092",
"kafka.security.protocol": "PLAINTEXT",
},
"checkpoint_location": "./tmp/checkpoint",
"checkpoint_location": "/tmp/checkpoint",
"kafka_topic": "test-kafka-writer-topic",
"output_mode": "append",
"unique_identifier_column_name": "id",
Expand Down

0 comments on commit 54dfbbc

Please sign in to comment.