diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 821be1c..ff70e3f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -7,19 +7,45 @@ on: - main jobs: - tests: - strategy: - fail-fast: false # Prevents the matrix from failing fast - matrix: - os: [ubuntu-latest, macos-latest] - runs-on: ${{ matrix.os }} + # Mac tests and Ubuntu tests are separated so that Ubuntu tests can + # run on both PRs and main; and Mac tests only on main branch. + ubuntu_tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: cachix/install-nix-action@v26 + - uses: cachix/cachix-action@v14 + with: + name: devenv + + - name: Install devenv.sh + run: nix profile install nixpkgs#devenv + + - name: Build the devenv shell and run any pre-commit hooks + run: devenv test + macos_tests: + if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }} + runs-on: macos-latest steps: - uses: actions/checkout@v4 + + - name: Setup Docker (macOS) with Colima + run: | + brew install docker docker-compose colima + colima delete + colima start --arch x86_64 + echo $SHELL + sudo ln -sf $HOME/.colima/default/docker.sock /var/run/docker.sock + docker --version + curl -s --unix-socket $HOME/.colima/default/docker.sock http/_ping + - uses: cachix/install-nix-action@v26 - uses: cachix/cachix-action@v14 with: name: devenv + - name: Install devenv.sh run: nix profile install nixpkgs#devenv diff --git a/.test.sh b/.test.sh index 8c0cd1d..2cc4cb6 100755 --- a/.test.sh +++ b/.test.sh @@ -1,5 +1,30 @@ #!/usr/bin/env bash set -euo pipefail +SERVICE_NAME="schema-registry" + +# Function to check if the container is healthy +check_health() { + STATUS=$(docker inspect --format='{{.State.Health.Status}}' schema-registry 2>/dev/null) + + if [ "$STATUS" == "healthy" ]; then + echo "Container 'schema-registry' is healthy." + return 0 + else + echo "Container 'schema-registry' is not healthy. Current status: $STATUS" + return 1 + fi +} + +# Loop until the container is healthy +while true; do + if check_health; then + break + else + echo "Retrying in 5 seconds..." + sleep 5 + fi +done + python -m mypy . python -m pytest -vv diff --git a/devenv.nix b/devenv.nix index 1f540b4..3988232 100644 --- a/devenv.nix +++ b/devenv.nix @@ -75,7 +75,9 @@ in languages.java.jdk.package = pkgs.jdk8; # Java version running on AWS Glue processes = { - kafka-test.exec = "docker compose -f tests/docker-compose.yml up --build"; + kafka-test.exec = '' + docker compose -f tests/docker-compose.yml up --build + ''; }; enterShell = '' diff --git a/poetry.lock b/poetry.lock index 84467d6..2399606 100644 --- a/poetry.lock +++ b/poetry.lock @@ -33,6 +33,17 @@ docs = ["cogapp", "furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphi tests = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] tests-mypy = ["mypy (>=1.11.1)", "pytest-mypy-plugins"] +[[package]] +name = "certifi" +version = "2024.8.30" +description = "Python package for providing Mozilla's CA Bundle." +optional = false +python-versions = ">=3.6" +files = [ + {file = "certifi-2024.8.30-py3-none-any.whl", hash = "sha256:922820b53db7a7257ffbda3f597266d435245903d80737e34f8a45ff3e3230d8"}, + {file = "certifi-2024.8.30.tar.gz", hash = "sha256:bec941d2aa8195e248a60b31ff9f0558284cf01a52591ceda73ea9afffd69fd9"}, +] + [[package]] name = "charset-normalizer" version = "3.3.2" @@ -166,6 +177,57 @@ questionary = ">=2.0,<3.0" termcolor = ">=1.1,<3" tomlkit = ">=0.5.3,<1.0.0" +[[package]] +name = "confluent-kafka" +version = "2.5.3" +description = "Confluent's Python client for Apache Kafka" +optional = false +python-versions = "*" +files = [ + {file = "confluent-kafka-2.5.3.tar.gz", hash = "sha256:eca625b0a8742d864a954bbe6493d453c07bacedf9e10d71a54dd1047f775778"}, + {file = "confluent_kafka-2.5.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:8a1a2a8756b2c1cd2654ea83d1e819a6e2c0a4337eacec50bfd2ab1f0c24a29c"}, + {file = "confluent_kafka-2.5.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c284eefed1b27133d90afc0fa2fd735864db8501190f3c2e0c8d8b1a20b07759"}, + {file = "confluent_kafka-2.5.3-cp310-cp310-manylinux_2_28_aarch64.whl", hash = "sha256:46c6063726fcdae835902961bb6c0e4c148499b87fdd513e6b2a6b406922ae3e"}, + {file = "confluent_kafka-2.5.3-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:505078b402dde98dc06dc66b6356acd19984742ef6b82dd52fb860f2a14b5a57"}, + {file = "confluent_kafka-2.5.3-cp310-cp310-win_amd64.whl", hash = "sha256:db30418bb36723a02ba51e058312056d0403c5f245beb379bff66e4b0c14337b"}, + {file = "confluent_kafka-2.5.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:4dd5fa74231fc21c3a26eeda1999a27f84768a6291a8b04c3cd61ac1deea4ace"}, + {file = "confluent_kafka-2.5.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:ac8b5fe45ee9c11ce7a516dc7c41441ebb17d9ff63c8646a59b8e52bd791b154"}, + {file = "confluent_kafka-2.5.3-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:7125c3f86a76136b25aa21c94303b33709e2dd15f777395ea81fbd6872d9147b"}, + {file = "confluent_kafka-2.5.3-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:8ec7a407bcb2eb122ff159d602cedc41d858f4c66a436c778f5d2f9f15fbec4e"}, + {file = "confluent_kafka-2.5.3-cp311-cp311-win_amd64.whl", hash = "sha256:4cfb18d69e6912fe90cbbcc9c7d805988122c51ab3041e1424ace64bc31b736f"}, + {file = "confluent_kafka-2.5.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:8d86de3e2c7bb59fb16faea468e833712912106f32a3a3ec345088c366042734"}, + {file = "confluent_kafka-2.5.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:e9ffb298b3ea3477afdaa5da6033d35dc0be01b10537d9b63994411e79b41477"}, + {file = "confluent_kafka-2.5.3-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:61a92637bea8fca454ec711f46e7753647deb7da56132995103acb5eb5041a29"}, + {file = "confluent_kafka-2.5.3-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:3daad72791ae06dec257c9105278e89ae0924e86ef107a1acb443106f002f361"}, + {file = "confluent_kafka-2.5.3-cp312-cp312-win_amd64.whl", hash = "sha256:f626494cd6ad18fa2ed83f80d687bc0194cff6f61b3d4f2aa183efa23ede2e02"}, + {file = "confluent_kafka-2.5.3-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:f034f13c08ba238154d818294ceabb2257e8df8fb6489f891ec7600c7c541553"}, + {file = "confluent_kafka-2.5.3-cp36-cp36m-manylinux_2_28_aarch64.whl", hash = "sha256:806c43fd1524034a9b6c958b4f9395ff5f56ef697218a336eac1da5006184f66"}, + {file = "confluent_kafka-2.5.3-cp36-cp36m-manylinux_2_28_x86_64.whl", hash = "sha256:0cdb150c12d5ac6e33572cbf16243284c65a178e3719baa610a48d672e9d92bf"}, + {file = "confluent_kafka-2.5.3-cp36-cp36m-win_amd64.whl", hash = "sha256:a2ed265bf3420811efd802fd8ebf5ec0f20a82e9baeff5299a67f6a84dde1b06"}, + {file = "confluent_kafka-2.5.3-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:27d048429b138667c51541adc04bb398afa61a37a7be89f16ff9a318019d02c6"}, + {file = "confluent_kafka-2.5.3-cp37-cp37m-manylinux_2_28_aarch64.whl", hash = "sha256:eb80c22a7ca17839f229f299bafca1450c9fe4d5ca222e60e52428df91d42b56"}, + {file = "confluent_kafka-2.5.3-cp37-cp37m-manylinux_2_28_x86_64.whl", hash = "sha256:5122b8e9f94b6160d47e8f0020857376caa21f715b95c4b13c68683b47260c8f"}, + {file = "confluent_kafka-2.5.3-cp37-cp37m-win_amd64.whl", hash = "sha256:3b69c3120e0cac9ca463ca603ddc9d4e811409ef4ec69d2b6bb8bd94d6fce95e"}, + {file = "confluent_kafka-2.5.3-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:a6f152e704b01c6a726233d081921454b7de106a5e4036994d1d5f4b34e7e46f"}, + {file = "confluent_kafka-2.5.3-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:2b8eef8c2f963ca6f5fcc79a0d6edef4e25fba83dfc0ef3f0401e1644f60ff11"}, + {file = "confluent_kafka-2.5.3-cp38-cp38-manylinux_2_28_aarch64.whl", hash = "sha256:0751302b0fd8090cbca92d7d34d237768923107b30de2611f3db93c2118cf2a8"}, + {file = "confluent_kafka-2.5.3-cp38-cp38-manylinux_2_28_x86_64.whl", hash = "sha256:0e0cb3b18a59d1c6fcae60297ee25b5c65d5c39c8ad8033a8fa1392498a71c9e"}, + {file = "confluent_kafka-2.5.3-cp38-cp38-win_amd64.whl", hash = "sha256:0b14928cddba963ea7d1c66aa268b6d37976bc91b4cf2100b5b7336d848ced22"}, + {file = "confluent_kafka-2.5.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:dae80e9e1e4417462fe61f64da0ab111395719e35c9f7f3eac7c671ff5e868fe"}, + {file = "confluent_kafka-2.5.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:75e1da68b199ef2472e47785d9a5c2dc75d307ed78827ad929bb733728b18567"}, + {file = "confluent_kafka-2.5.3-cp39-cp39-manylinux_2_28_aarch64.whl", hash = "sha256:fa2318eaa9b2d5f3ebc2022b71e4ebf6242c13963b4faccf46eea49fea0ad91f"}, + {file = "confluent_kafka-2.5.3-cp39-cp39-manylinux_2_28_x86_64.whl", hash = "sha256:490836e9fc3b4489721327e3df987c8667916a97d178e2296913c8d5de6623a9"}, + {file = "confluent_kafka-2.5.3-cp39-cp39-win_amd64.whl", hash = "sha256:cfabe291cda68fdc3136f2f367dd4e5a6c3750113785f0c1936ba9cae09f4e9d"}, +] + +[package.extras] +avro = ["avro (>=1.11.1,<2)", "fastavro (>=0.23.0,<1.0)", "fastavro (>=1.0)", "requests"] +dev = ["avro (>=1.11.1,<2)", "fastavro (>=0.23.0,<1.0)", "fastavro (>=1.0)", "flake8", "pytest", "pytest (==4.6.4)", "pytest-timeout", "requests"] +doc = ["avro (>=1.11.1,<2)", "fastavro (>=0.23.0,<1.0)", "fastavro (>=1.0)", "requests", "sphinx", "sphinx-rtd-theme"] +json = ["jsonschema", "pyrsistent", "pyrsistent (==0.16.1)", "requests"] +protobuf = ["protobuf", "requests"] +schema-registry = ["requests"] + [[package]] name = "coverage" version = "7.6.1" @@ -293,6 +355,63 @@ files = [ pyreadline = {version = "*", markers = "platform_system == \"Windows\""} pyrepl = ">=0.8.2" +[[package]] +name = "fastavro" +version = "1.9.7" +description = "Fast read/write of AVRO files" +optional = false +python-versions = ">=3.8" +files = [ + {file = "fastavro-1.9.7-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:cc811fb4f7b5ae95f969cda910241ceacf82e53014c7c7224df6f6e0ca97f52f"}, + {file = "fastavro-1.9.7-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fb8749e419a85f251bf1ac87d463311874972554d25d4a0b19f6bdc56036d7cf"}, + {file = "fastavro-1.9.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0b2f9bafa167cb4d1c3dd17565cb5bf3d8c0759e42620280d1760f1e778e07fc"}, + {file = "fastavro-1.9.7-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:e87d04b235b29f7774d226b120da2ca4e60b9e6fdf6747daef7f13f218b3517a"}, + {file = "fastavro-1.9.7-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:b525c363e267ed11810aaad8fbdbd1c3bd8837d05f7360977d72a65ab8c6e1fa"}, + {file = "fastavro-1.9.7-cp310-cp310-win_amd64.whl", hash = "sha256:6312fa99deecc319820216b5e1b1bd2d7ebb7d6f221373c74acfddaee64e8e60"}, + {file = "fastavro-1.9.7-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:ec8499dc276c2d2ef0a68c0f1ad11782b2b956a921790a36bf4c18df2b8d4020"}, + {file = "fastavro-1.9.7-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:76d9d96f98052615ab465c63ba8b76ed59baf2e3341b7b169058db104cbe2aa0"}, + {file = "fastavro-1.9.7-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:919f3549e07a8a8645a2146f23905955c35264ac809f6c2ac18142bc5b9b6022"}, + {file = "fastavro-1.9.7-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:9de1fa832a4d9016724cd6facab8034dc90d820b71a5d57c7e9830ffe90f31e4"}, + {file = "fastavro-1.9.7-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:1d09227d1f48f13281bd5ceac958650805aef9a4ef4f95810128c1f9be1df736"}, + {file = "fastavro-1.9.7-cp311-cp311-win_amd64.whl", hash = "sha256:2db993ae6cdc63e25eadf9f93c9e8036f9b097a3e61d19dca42536dcc5c4d8b3"}, + {file = "fastavro-1.9.7-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:4e1289b731214a7315884c74b2ec058b6e84380ce9b18b8af5d387e64b18fc44"}, + {file = "fastavro-1.9.7-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:eac69666270a76a3a1d0444f39752061195e79e146271a568777048ffbd91a27"}, + {file = "fastavro-1.9.7-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9be089be8c00f68e343bbc64ca6d9a13e5e5b0ba8aa52bcb231a762484fb270e"}, + {file = "fastavro-1.9.7-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:d576eccfd60a18ffa028259500df67d338b93562c6700e10ef68bbd88e499731"}, + {file = "fastavro-1.9.7-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:ee9bf23c157bd7dcc91ea2c700fa3bd924d9ec198bb428ff0b47fa37fe160659"}, + {file = "fastavro-1.9.7-cp312-cp312-win_amd64.whl", hash = "sha256:b6b2ccdc78f6afc18c52e403ee68c00478da12142815c1bd8a00973138a166d0"}, + {file = "fastavro-1.9.7-cp38-cp38-macosx_11_0_universal2.whl", hash = "sha256:7313def3aea3dacface0a8b83f6d66e49a311149aa925c89184a06c1ef99785d"}, + {file = "fastavro-1.9.7-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:536f5644737ad21d18af97d909dba099b9e7118c237be7e4bd087c7abde7e4f0"}, + {file = "fastavro-1.9.7-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2af559f30383b79cf7d020a6b644c42ffaed3595f775fe8f3d7f80b1c43dfdc5"}, + {file = "fastavro-1.9.7-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:edc28ab305e3c424de5ac5eb87b48d1e07eddb6aa08ef5948fcda33cc4d995ce"}, + {file = "fastavro-1.9.7-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:ec2e96bdabd58427fe683329b3d79f42c7b4f4ff6b3644664a345a655ac2c0a1"}, + {file = "fastavro-1.9.7-cp38-cp38-win_amd64.whl", hash = "sha256:3b683693c8a85ede496ebebe115be5d7870c150986e34a0442a20d88d7771224"}, + {file = "fastavro-1.9.7-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:58f76a5c9a312fbd37b84e49d08eb23094d36e10d43bc5df5187bc04af463feb"}, + {file = "fastavro-1.9.7-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:56304401d2f4f69f5b498bdd1552c13ef9a644d522d5de0dc1d789cf82f47f73"}, + {file = "fastavro-1.9.7-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2fcce036c6aa06269fc6a0428050fcb6255189997f5e1a728fc461e8b9d3e26b"}, + {file = "fastavro-1.9.7-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:17de68aae8c2525f5631d80f2b447a53395cdc49134f51b0329a5497277fc2d2"}, + {file = "fastavro-1.9.7-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:7c911366c625d0a997eafe0aa83ffbc6fd00d8fd4543cb39a97c6f3b8120ea87"}, + {file = "fastavro-1.9.7-cp39-cp39-win_amd64.whl", hash = "sha256:912283ed48578a103f523817fdf0c19b1755cea9b4a6387b73c79ecb8f8f84fc"}, + {file = "fastavro-1.9.7.tar.gz", hash = "sha256:13e11c6cb28626da85290933027cd419ce3f9ab8e45410ef24ce6b89d20a1f6c"}, +] + +[package.extras] +codecs = ["cramjam", "lz4", "zstandard"] +lz4 = ["lz4"] +snappy = ["cramjam"] +zstandard = ["zstandard"] + +[[package]] +name = "idna" +version = "3.8" +description = "Internationalized Domain Names in Applications (IDNA)" +optional = false +python-versions = ">=3.6" +files = [ + {file = "idna-3.8-py3-none-any.whl", hash = "sha256:050b4e5baadcd44d760cedbd2b8e639f2ff89bbc7a5730fcc662954303377aac"}, + {file = "idna-3.8.tar.gz", hash = "sha256:d838c2c0ed6fced7693d5e8ab8e734d5f8fda53a039c0164afb0b82e771e3603"}, +] + [[package]] name = "iniconfig" version = "2.0.0" @@ -654,6 +773,23 @@ files = [ pytest = ">=3.7.0" setuptools = "*" +[[package]] +name = "pytest-mock" +version = "3.14.0" +description = "Thin-wrapper around the mock package for easier use with pytest" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pytest-mock-3.14.0.tar.gz", hash = "sha256:2719255a1efeceadbc056d6bf3df3d1c5015530fb40cf347c0f9afac88410bd0"}, + {file = "pytest_mock-3.14.0-py3-none-any.whl", hash = "sha256:0b72c38033392a5f4621342fe11e9219ac11ec9d375f8e2a0c164539e0d70f6f"}, +] + +[package.dependencies] +pytest = ">=6.2.5" + +[package.extras] +dev = ["pre-commit", "pytest-asyncio", "tox"] + [[package]] name = "pyyaml" version = "6.0.2" @@ -730,6 +866,46 @@ files = [ [package.dependencies] prompt_toolkit = ">=2.0,<=3.0.36" +[[package]] +name = "requests" +version = "2.32.3" +description = "Python HTTP for Humans." +optional = false +python-versions = ">=3.8" +files = [ + {file = "requests-2.32.3-py3-none-any.whl", hash = "sha256:70761cfe03c773ceb22aa2f671b4757976145175cdfca038c02654d061d6dcc6"}, + {file = "requests-2.32.3.tar.gz", hash = "sha256:55365417734eb18255590a9ff9eb97e9e1da868d4ccd6402399eaf68af20a760"}, +] + +[package.dependencies] +certifi = ">=2017.4.17" +charset-normalizer = ">=2,<4" +idna = ">=2.5,<4" +urllib3 = ">=1.21.1,<3" + +[package.extras] +socks = ["PySocks (>=1.5.6,!=1.5.7)"] +use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] + +[[package]] +name = "responses" +version = "0.25.3" +description = "A utility library for mocking out the `requests` Python library." +optional = false +python-versions = ">=3.8" +files = [ + {file = "responses-0.25.3-py3-none-any.whl", hash = "sha256:521efcbc82081ab8daa588e08f7e8a64ce79b91c39f6e62199b19159bea7dbcb"}, + {file = "responses-0.25.3.tar.gz", hash = "sha256:617b9247abd9ae28313d57a75880422d55ec63c29d33d629697590a034358dba"}, +] + +[package.dependencies] +pyyaml = "*" +requests = ">=2.30.0,<3.0" +urllib3 = ">=1.25.10,<3.0" + +[package.extras] +tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "tomli", "tomli-w", "types-PyYAML", "types-requests"] + [[package]] name = "setuptools" version = "74.1.2" @@ -786,6 +962,17 @@ files = [ {file = "tomlkit-0.13.2.tar.gz", hash = "sha256:fff5fe59a87295b278abd31bec92c15d9bc4a06885ab12bcea52c71119392e79"}, ] +[[package]] +name = "types-confluent-kafka" +version = "1.2.2" +description = "" +optional = false +python-versions = "<4.0,>=3.8" +files = [ + {file = "types_confluent_kafka-1.2.2-py3-none-any.whl", hash = "sha256:61dbcf5223d48593ec925f69ce39f63b5ec79902b3f27e496ff0e7bceeba4baf"}, + {file = "types_confluent_kafka-1.2.2.tar.gz", hash = "sha256:c1e4095ed8bd87b9f2b05f0b766ad0c2ade4e19da7e41c91542c805639d0bbef"}, +] + [[package]] name = "typing-extensions" version = "4.12.2" @@ -797,6 +984,23 @@ files = [ {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, ] +[[package]] +name = "urllib3" +version = "2.2.2" +description = "HTTP library with thread-safe connection pooling, file post, and more." +optional = false +python-versions = ">=3.8" +files = [ + {file = "urllib3-2.2.2-py3-none-any.whl", hash = "sha256:a448b2f64d686155468037e1ace9f2d2199776e17f0a46610480d311f73e3472"}, + {file = "urllib3-2.2.2.tar.gz", hash = "sha256:dd505485549a7a552833da5e6063639d0d177c04f23bc3864e41e5dc5f612168"}, +] + +[package.extras] +brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"] +h2 = ["h2 (>=4,<5)"] +socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] +zstd = ["zstandard (>=0.18.0)"] + [[package]] name = "wcwidth" version = "0.2.13" @@ -827,5 +1031,5 @@ test = ["pytest"] [metadata] lock-version = "2.0" -python-versions = ">=3.10.14" -content-hash = "0f190c56d5a7e5cd472b25fd8626ddb2c28e70256d653e0a3a8e4e2fc77f3f5c" +python-versions = ">=3.10.14, <4.0" +content-hash = "d7cec6d06cd50cd6e3908959695f4785d2bfe9cbb0a8f0b93f179d75cba0af57" diff --git a/pyproject.toml b/pyproject.toml index 977f613..19d0c05 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,11 +13,13 @@ packages = [ ] [tool.poetry.dependencies] -python = ">=3.10.14" +python = ">=3.10.14, <4.0" # FIXME we need to lock this for now, given the plugins we install are # tightly coupled with the versions. Unless we require users to add # these plugins on runtime, we can't change this. pyspark = "3.3.2" +requests = "^2.32.3" +responses = "^0.25.3" [tool.poetry.group.dev.dependencies] commitizen = "^3.29.0" @@ -26,6 +28,10 @@ mypy = "^1.11.0" pytest-coverage = "^0.0" pytest-dependency = "^0.6.0" pytest = "^8.3.1" +pytest-mock = "^3.14.0" +confluent-kafka = "^2.5.3" +fastavro = "^1.9.7" +types-confluent-kafka = "^1.2.2" [tool.commitizen] version = "1.0.0" diff --git a/src/sparkle/config/kafka_config.py b/src/sparkle/config/kafka_config.py index ed0d4e2..3422126 100644 --- a/src/sparkle/config/kafka_config.py +++ b/src/sparkle/config/kafka_config.py @@ -1,6 +1,19 @@ +from enum import Enum from dataclasses import dataclass +class SchemaFormat(Enum): + """Enumeration for different schema types. + + Attributes: + RAW (str): Raw schema type. + AVRO (str): Avro schema type. + """ + + raw = "raw" + avro = "avro" + + @dataclass(frozen=True) class Credentials: """Credentials for external services. @@ -21,10 +34,12 @@ class SchemaRegistryConfig: Attributes: url (str): URL of the Schema Registry. credentials (Credentials): Credentials for accessing the Schema Registry. + schema_format (SchemaFormat): Format of schema to use with the Schema Registry. """ url: str credentials: Credentials + schema_format: SchemaFormat @dataclass(frozen=True) diff --git a/src/sparkle/reader/kafka_reader.py b/src/sparkle/reader/kafka_reader.py new file mode 100644 index 0000000..1936412 --- /dev/null +++ b/src/sparkle/reader/kafka_reader.py @@ -0,0 +1,125 @@ +from typing import Any +from pyspark.sql import SparkSession, DataFrame +from sparkle.config import Config +from sparkle.config.kafka_config import SchemaFormat +from sparkle.reader.schema_registry import SchemaRegistry +from sparkle.utils.spark import parse_by_avro + + +class KafkaReader: + """KafkaReader is a reader for streaming data from Kafka using Spark. + + This class allows you to read data from a specified Kafka topic, with support + for Avro format parsing using a schema registry. + + Attributes: + spark (SparkSession): Spark session to be used for reading data. + topic (str): Kafka topic to read from. + schema_registry (SchemaRegistry): Schema registry client for fetching Avro schemas. + schema_version (str): Version of the schema to use for Avro parsing. + format_ (SchemaFormat): The format of the schema (e.g., Avro) used for parsing data. + kafka_options (Dict[str, Any]): Dictionary containing Kafka configuration options for Spark. + """ + + def __init__( + self, + spark: SparkSession, + topic: str, + schema_registry: SchemaRegistry, + format_: SchemaFormat = SchemaFormat.avro, + schema_version: str = "latest", + kafka_spark_options: dict[str, Any] = {}, + ): + """Initializes KafkaReader with configuration, Spark session, topic, and schema registry. + + Args: + spark (SparkSession): Spark session to be used for reading data. + topic (str): Kafka topic to read from. + schema_registry (SchemaRegistry): Schema registry client for fetching Avro schemas. + format_ (SchemaFormat, optional): The format of the schema (e.g., Avro) used for parsing data. + Defaults to SchemaFormat.avro. + schema_version (str, optional): Schema version to use for reading data. Defaults to "latest". + kafka_spark_options (Dict[str, Any], optional): Dictionary containing Kafka configuration options + for Spark. Defaults to an empty dictionary. + """ + self.spark = spark + self.topic = topic + self.schema_registry = schema_registry + self.schema_version = schema_version + self.format_ = format_ + self.kafka_options = kafka_spark_options + + @classmethod + def with_config( + cls, config: Config, spark: SparkSession, **kwargs + ) -> "KafkaReader": + """Creates a KafkaReader instance with specific configuration. + + Args: + config (Config): Configuration object containing Kafka settings. + spark (SparkSession): Spark session to be used for reading data. + **kwargs: Additional keyword arguments, such as topic, schema registry, and schema version. + + Returns: + KafkaReader: An instance of KafkaReader configured with the provided settings. + + Raises: + ValueError: If Kafka input configuration is missing in the provided config. + """ + if not config.kafka_input: + raise ValueError("Kafka input configuration is missing.") + + schema_registry = SchemaRegistry.with_config(config) + + return cls( + spark=spark, + topic=config.kafka_input.kafka_topic, + schema_registry=schema_registry, + **kwargs + ) + + def read_raw(self) -> DataFrame: + """Reads raw data from the Kafka topic as a Spark DataFrame. + + This method connects to the Kafka topic and reads data as a raw Spark + DataFrame without applying any format-specific parsing. + + Returns: + DataFrame: A Spark DataFrame containing the raw data from the Kafka topic. + """ + df = ( + self.spark.readStream.format("kafka") + .option("subscribe", self.topic) + .options(**self.kafka_options) + .load() + ) + return df + + def read(self) -> DataFrame: + """Reads data from the Kafka topic, optionally parsing it using the specified format. + + Returns: + DataFrame: A Spark DataFrame containing the data read from the Kafka topic. + """ + if self.format_ == SchemaFormat.avro: + return self.read_avro() + return self.read_raw() + + def read_avro(self) -> DataFrame: + """Reads Avro data from the Kafka topic and parses it using the schema registry. + + Returns: + DataFrame: A Spark DataFrame containing the parsed Avro data. + + Raises: + ValueError: If the topic name contains '*' or ',' characters, which are not allowed. + """ + if "*" in self.topic or "," in self.topic: + raise ValueError( + "Topic name cannot contain '*' or ',' characters. Use read_multiple method for multiple topics." + ) + + self.schema_registry.fetch_schema(self.topic, self.schema_version) + return self.read_raw().transform( + parse_by_avro(self.topic, self.schema_registry) + ) diff --git a/src/sparkle/reader/schema_registry.py b/src/sparkle/reader/schema_registry.py new file mode 100644 index 0000000..aa73fc2 --- /dev/null +++ b/src/sparkle/reader/schema_registry.py @@ -0,0 +1,182 @@ +import requests # type: ignore +from requests.auth import HTTPBasicAuth # type: ignore +from sparkle.config import Config +from enum import Enum + + +class SubjectNameStrategy(Enum): + """Enumeration of subject name strategies for schema registry.""" + + TOPIC_NAME = "topic_name" + RECORD_NAME = "record_name" + TOPIC_RECORD_NAME = "topic_record_name" + + +class SchemaRegistry: + """A client for interacting with a schema registry. + + This class provides methods to fetch, cache, and register schemas in a schema + registry using HTTP requests. It supports only the `TOPIC_NAME` strategy for + subject naming. + + Attributes: + schema_registry_url (str): URL of the schema registry. + username (str | None): Username for authentication. + password (str | None): Password for authentication. + __cache (dict[str, str]): Internal cache for storing fetched schemas. + """ + + def __init__( + self, + schema_registry_url: str, + username: str | None = None, + password: str | None = None, + subject_name_strategy: SubjectNameStrategy = SubjectNameStrategy.TOPIC_NAME, + ): + """Initializes a SchemaRegistry client. + + Args: + schema_registry_url (str): URL of the schema registry. + username (str | None, optional): Username for authentication. Defaults to None. + password (str | None, optional): Password for authentication. Defaults to None. + subject_name_strategy (SubjectNameStrategy, optional): Strategy for naming subjects. + Only `TOPIC_NAME` is supported. Defaults to `SubjectNameStrategy.TOPIC_NAME`. + + Raises: + NotImplementedError: If a strategy other than `TOPIC_NAME` is used. + """ + if subject_name_strategy != SubjectNameStrategy.TOPIC_NAME: + raise NotImplementedError( + "Only SubjectNameStrategy.TOPIC_NAME is supported at the moment." + ) + + self.schema_registry_url = schema_registry_url + self.username = username + self.password = password + self.__cache: dict[str, str] = {} + + def _subject_name(self, topic: str) -> str: + """Generates the subject name based on the topic. + + Args: + topic (str): The Kafka topic name. + + Returns: + str: The subject name formatted as '-value'. + """ + return f"{topic}-value" + + @classmethod + def with_config(cls, config: Config) -> "SchemaRegistry": + """Creates a SchemaRegistry instance from a Config object. + + This method initializes a SchemaRegistry instance using configuration + details provided in the `Config` object. + + Args: + config (Config): Configuration object containing settings for the schema registry. + + Returns: + SchemaRegistry: An instance of SchemaRegistry configured with the provided settings. + + Raises: + ValueError: If the necessary Kafka input or schema registry settings are not configured. + """ + if not config.kafka_input: + raise ValueError("Kafka input is not configured.") + + if not config.kafka_input.kafka_config: + raise ValueError("Kafka config is not configured.") + + if not config.kafka_input.kafka_config.schema_registry: + raise ValueError("Schema registry is not configured.") + + if not config.kafka_input.kafka_config.schema_registry.credentials: + raise ValueError("Schema registry credentials are not configured.") + + return cls( + schema_registry_url=config.kafka_input.kafka_config.schema_registry.url, + username=config.kafka_input.kafka_config.schema_registry.credentials.username, + password=config.kafka_input.kafka_config.schema_registry.credentials.password, + ) + + @property + def _auth(self) -> HTTPBasicAuth | None: + """Generates HTTP basic authentication credentials. + + Returns: + HTTPBasicAuth | None: Authentication object if username and password are set, otherwise None. + """ + if self.username and self.password: + return HTTPBasicAuth(self.username, self.password) + return None + + def fetch_schema(self, topic: str, version: str = "latest") -> str: + """Fetches the schema from the schema registry for a given topic and version. + + This method retrieves the schema for the specified topic and version from the schema registry. + + Args: + topic (str): The Kafka topic name. + version (str, optional): The schema version to fetch. Defaults to "latest". + + Returns: + str: The fetched schema as a string. + + Raises: + ValueError: If the fetched schema is not a string. + requests.HTTPError: If the request to the schema registry fails. + """ + subject = self._subject_name(topic) + url = f"{self.schema_registry_url}/subjects/{subject}/versions/{version}" + + response = requests.get(url, auth=self._auth) + response.raise_for_status() + + schema = response.json()["schema"] + + if not isinstance(schema, str): + raise ValueError(f"Schema is not a string: {schema}") + + return schema + + def cached_schema(self, topic: str, version: str = "latest") -> str: + """Retrieves the schema from the cache or fetches it if not cached. + + Args: + topic (str): The Kafka topic name. + version (str, optional): The schema version to fetch. Defaults to "latest". + + Returns: + str: The cached or fetched schema as a string. + """ + if topic not in self.__cache: + self.__cache[topic] = self.fetch_schema(topic, version) + + return self.__cache[topic] + + def put_schema(self, topic: str, schema: str) -> int: + """Registers a schema with the schema registry for a given topic. + + Args: + topic (str): The Kafka topic name. + schema (str): The schema to register as a string. + + Returns: + int: The ID of the registered schema. + + Raises: + ValueError: If the schema ID returned is not a valid integer. + requests.HTTPError: If the request to the schema registry fails. + """ + subject = self._subject_name(topic) + url = f"{self.schema_registry_url}/subjects/{subject}/versions" + + response = requests.post(url, auth=self._auth, json={"schema": schema}) + response.raise_for_status() + + schema_id = response.json()["id"] + if not isinstance(schema_id, int): + raise ValueError(f"Schema ID is not of valid type: {schema_id}") + + return schema_id diff --git a/src/sparkle/utils/spark.py b/src/sparkle/utils/spark.py index 78354c5..7009ab1 100644 --- a/src/sparkle/utils/spark.py +++ b/src/sparkle/utils/spark.py @@ -1,5 +1,8 @@ from sparkle.utils.logger import logger from pyspark.sql import SparkSession, functions as F, DataFrame +from pyspark.sql.avro.functions import from_avro +from sparkle.reader.schema_registry import SchemaRegistry +from collections.abc import Callable def table_exists( @@ -61,3 +64,57 @@ def to_kafka_dataframe(unique_identifier_column_name: str, df: DataFrame) -> Dat F.to_json(F.struct([df[f] for f in df.columns])).alias("value"), ] ) + + +def parse_by_avro( + topic: str, + schema_registry_client: SchemaRegistry, + options: dict[str, str] = dict(), +) -> Callable[[DataFrame], DataFrame]: + """Parses Kafka messages in Avro format using a schema from the schema registry. + + This function generates a transformer function that can be applied to a Spark DataFrame + containing Kafka messages. It uses a schema fetched from the schema registry to parse + the Avro-encoded message values. + + Args: + topic (str): The Kafka topic name to fetch the schema for. + schema_registry_client (SchemaRegistry): Client to interact with the schema registry. + options (dict[str, str], optional): Additional options for Avro parsing. Defaults to an empty dictionary. + + Returns: + Callable[[DataFrame], DataFrame]: A transformer function that takes a Spark DataFrame + and returns a DataFrame with parsed Avro values. + + Example: + >>> transformer = parse_by_avro("my_topic", schema_registry_client) + >>> transformed_df = transformer(input_df) + + Raises: + ValueError: If the schema cannot be fetched from the schema registry. + """ + schema = schema_registry_client.cached_schema(topic) + + # Skip the first 5 bytes of the value, which is the magic byte and + # the schema ID. The rest is the Avro value. + avro_value = F.expr("substring(value, 6, length(value))") + + def transformer(df: DataFrame) -> DataFrame: + kafka_metadata = "__kafka_metadata__" + return ( + df.withColumn( + kafka_metadata, + F.struct( + df.key, + df.topic, + df.partition, + df.offset, + df.timestamp, + df.timestampType, + ), + ) + .withColumn("value", from_avro(avro_value, schema, options=options)) + .select("value.*", kafka_metadata) + ) + + return transformer diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index b6643dc..d8f8e97 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -1,5 +1,3 @@ -version: '3.9' - services: broker: image: apache/kafka:3.8.0 @@ -46,21 +44,3 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 - - control-center: - image: confluentinc/cp-enterprise-control-center:7.7.0 - hostname: control-center - container_name: control-center - depends_on: - - broker - - schema-registry - ports: - - "9021:9021" - environment: - CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' - CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" - CONTROL_CENTER_REPLICATION_FACTOR: 1 - CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 - CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 - CONFLUENT_METRICS_TOPIC_REPLICATION: 1 - PORT: 9021 diff --git a/tests/unit/reader/test_kafka_reader.py b/tests/unit/reader/test_kafka_reader.py new file mode 100644 index 0000000..050a01b --- /dev/null +++ b/tests/unit/reader/test_kafka_reader.py @@ -0,0 +1,185 @@ +from time import sleep +from typing import Any +from collections.abc import Generator +import pytest +from pyspark.sql import SparkSession, DataFrame +from confluent_kafka import Producer +from confluent_kafka.admin import AdminClient, NewTopic +from confluent_kafka.schema_registry import SchemaRegistryClient, Schema +from confluent_kafka.schema_registry.avro import AvroSerializer +from confluent_kafka.serialization import ( + StringSerializer, + SerializationContext, + MessageField, +) +from sparkle.reader.kafka_reader import KafkaReader, SchemaRegistry +from sparkle.config.kafka_config import SchemaFormat + +KAFKA_BROKER_URL = "localhost:9092" +SCHEMA_REGISTRY_URL = "http://localhost:8081" +TEST_TOPIC = "test-topic" + + +@pytest.fixture(scope="session") +def kafka_setup() -> Generator[str, None, None]: + """Fixture to set up Kafka environment. + + This fixture sets up a Kafka broker, creates a topic, and cleans up after tests. + + Yields: + str: The Kafka topic name used for testing. + """ + admin_client = AdminClient({"bootstrap.servers": KAFKA_BROKER_URL}) + + admin_client.create_topics( + [NewTopic(TEST_TOPIC, num_partitions=1, replication_factor=1)] + ) + + yield TEST_TOPIC + + # Cleanup + admin_client.delete_topics([TEST_TOPIC]) + + +@pytest.fixture +def kafka_producer() -> Producer: + """Fixture to create a Kafka producer using confluent-kafka. + + Returns: + Producer: Confluent Kafka producer instance. + """ + return Producer({"bootstrap.servers": KAFKA_BROKER_URL}) + + +@pytest.fixture +def schema_registry_client() -> SchemaRegistryClient: + """Fixture to create a Schema Registry client. + + Returns: + SchemaRegistryClient: A Schema Registry client connected to the Confluent Schema Registry. + """ + return SchemaRegistryClient({"url": SCHEMA_REGISTRY_URL}) + + +@pytest.fixture +def avro_serializer(schema_registry_client: SchemaRegistryClient) -> AvroSerializer: + """Fixture to create an Avro serializer using the Confluent Schema Registry. + + Args: + schema_registry_client (SchemaRegistryClient): The Schema Registry client fixture. + + Returns: + AvroSerializer: Serializer for Avro data using the provided schema registry client. + """ + schema_str = """ + { + "type": "record", + "name": "TestRecord", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"} + ] + } + """ + # Register the schema with the schema registry + schema = Schema(schema_str, schema_type="AVRO") + schema_registry_client.register_schema(f"{TEST_TOPIC}-value", schema) + + return AvroSerializer(schema_registry_client, schema_str) + + +@pytest.fixture +def kafka_reader( + spark_session: SparkSession, + kafka_setup: str, +) -> KafkaReader: + """Fixture to create a KafkaReader instance with the provided schema registry client. + + Args: + spark_session (SparkSession): Spark session fixture. + kafka_setup (str): Kafka topic fixture. + + Returns: + KafkaReader: Instance of KafkaReader for testing. + """ + registry = SchemaRegistry(SCHEMA_REGISTRY_URL) + + return KafkaReader( + spark=spark_session, + topic=kafka_setup, + schema_registry=registry, + format_=SchemaFormat.avro, + schema_version="latest", + kafka_spark_options={ + "kafka.bootstrap.servers": KAFKA_BROKER_URL, + "auto.offset.reset": "earliest", + "enable.auto.commit": True, + }, + ) + + +def produce_avro_message( + producer: Producer, + topic: str, + avro_serializer: AvroSerializer, + value: dict[str, Any], +) -> None: + """Produces an Avro encoded message to Kafka. + + Args: + producer (Producer): Confluent Kafka producer instance. + topic (str): Kafka topic to write to. + avro_serializer (AvroSerializer): Avro serializer for encoding the message. + value (Dict[str, any]): Dictionary representing the Avro value to encode and send. + """ + string_serializer = StringSerializer("utf_8") + producer.produce( + topic=topic, + key=string_serializer( + value["name"], SerializationContext(topic, MessageField.KEY) + ), + value=avro_serializer(value, SerializationContext(topic, MessageField.VALUE)), + ) + producer.flush() + + +def test_kafka_reader_avro( + spark_session: SparkSession, + kafka_reader: KafkaReader, + kafka_producer: Producer, + avro_serializer: AvroSerializer, +) -> None: + """Test KafkaReader's ability to read Avro encoded data from Kafka. + + Args: + spark_session (SparkSession): Spark session fixture. + kafka_reader (KafkaReader): KafkaReader instance fixture. + kafka_producer (Producer): Confluent Kafka producer fixture. + avro_serializer (AvroSerializer): Avro serializer fixture. + """ + value = {"name": "John Doe", "age": 30} + + # Use writeStream to test the streaming DataFrame + query = ( + kafka_reader.read() + .writeStream.format("memory") # Use an in-memory sink for testing + .queryName("kafka_test") # Name the query for querying results + .outputMode("append") + .start() + ) + + sleep(3) + produce_avro_message(kafka_producer, kafka_reader.topic, avro_serializer, value) + + # Allow the stream to process the data + query.awaitTermination(30) + + # Query the in-memory table + result_df: DataFrame = spark_session.sql("SELECT * FROM kafka_test") + results = result_df.collect() + + assert len(results) > 0, "No data was read from Kafka." + assert results[0]["name"] == "John Doe" + assert results[0]["age"] == 30 + + query.stop() diff --git a/tests/unit/reader/test_schema_registry.py b/tests/unit/reader/test_schema_registry.py new file mode 100644 index 0000000..5e67040 --- /dev/null +++ b/tests/unit/reader/test_schema_registry.py @@ -0,0 +1,94 @@ +import json +import pytest +import responses +from sparkle.reader.schema_registry import SchemaRegistry + +TEST_SCHEMA_REGISTRY = "http://test-schema-registry:8081" +TEST_TOPIC = "test" +TEST_SCHEMA = { + "type": "record", + "name": "test", + "fields": [{"name": "test", "type": "string"}], +} + + +@pytest.fixture +def schema_api_response(): + """Fixture providing a mock schema response from the schema registry API.""" + return { + "subject": "test-value", + "version": 1, + "id": 1, + "schema": json.dumps({"properties": TEST_SCHEMA}), + } + + +@responses.activate +def test_fetch_schema(schema_api_response): + """Test fetching the schema from the schema registry without authentication.""" + responses.add( + responses.GET, + f"{TEST_SCHEMA_REGISTRY}/subjects/{TEST_TOPIC}-value/versions/latest", + json=schema_api_response, + status=200, + ) + + schema = SchemaRegistry(TEST_SCHEMA_REGISTRY).fetch_schema("test") + + assert json.loads(schema) == { + "properties": { + "type": "record", + "name": "test", + "fields": [{"name": "test", "type": "string"}], + } + } + + +@responses.activate +def test_fetch_schema_with_auth(schema_api_response): + """Test fetching the schema with basic authentication headers.""" + responses.add( + responses.GET, + f"{TEST_SCHEMA_REGISTRY}/subjects/{TEST_TOPIC}-value/versions/latest", + json=schema_api_response, + status=200, + ) + + SchemaRegistry(TEST_SCHEMA_REGISTRY, username="test", password="test").fetch_schema( + "test" + ) + assert responses.calls[0].request.headers["Authorization"] == "Basic dGVzdDp0ZXN0" + + +@responses.activate +def test_fetch_cached_schema(schema_api_response): + """Test that the schema is fetched only once and cached on subsequent requests.""" + url = f"{TEST_SCHEMA_REGISTRY}/subjects/{TEST_TOPIC}-value/versions/latest" + responses.add( + responses.GET, + url, + json=schema_api_response, + status=200, + ) + + schema_registry = SchemaRegistry(TEST_SCHEMA_REGISTRY) + schema_registry.cached_schema("test") + schema_registry.cached_schema("test") + + assert len(responses.calls) == 1 + assert responses.calls[0].request.url == url + + +@responses.activate +def test_put_schema(schema_api_response): + """Test registering a schema to the schema registry and returning the schema ID.""" + responses.add( + responses.POST, + f"{TEST_SCHEMA_REGISTRY}/subjects/{TEST_TOPIC}-value/versions", + json={"id": 1}, + status=200, + ) + + schema_id = SchemaRegistry(TEST_SCHEMA_REGISTRY).put_schema("test", TEST_SCHEMA) + + assert schema_id == 1 diff --git a/tests/unit/utils/test_spark.py b/tests/unit/utils/test_spark.py index e9f2e9e..24f8951 100644 --- a/tests/unit/utils/test_spark.py +++ b/tests/unit/utils/test_spark.py @@ -3,7 +3,12 @@ from sparkle.utils.spark import table_exists from sparkle.utils.spark import to_kafka_dataframe from tests.conftest import json_to_string -from pyspark.sql import DataFrame +from pyspark.sql import DataFrame, SparkSession, Row +from pyspark.sql.functions import col, lit, struct +from sparkle.reader.schema_registry import SchemaRegistry +from sparkle.utils.spark import parse_by_avro +from pyspark.sql.avro.functions import to_avro +from pyspark.sql import functions as F @pytest.mark.parametrize( @@ -81,3 +86,51 @@ def test_generate_kafka_acceptable_dataframe(user_dataframe: DataFrame, spark_se assert df.count() == expected_df.count() assert expected_df.join(df, ["key"]).count() == expected_df.count() assert expected_df.join(df, ["value"]).count() == expected_df.count() + + +@pytest.fixture +def mock_schema_registry(mocker): + """Fixture to create a mock schema registry client.""" + mock = mocker.Mock(spec=SchemaRegistry) + mock.cached_schema.return_value = ( + '{"type": "record", "name": "test", "fields":' + '[{"name": "test", "type": "string"}]}' + ) + return mock + + +def test_parse_by_avro(spark_session: SparkSession, mock_schema_registry): + """Test the parse_by_avro function with a mock schema registry and sample DataFrame.""" + schema = mock_schema_registry.cached_schema() + + # Create a DataFrame with a struct matching the Avro schema + data = [Row(test="value1")] + df = spark_session.createDataFrame(data).select(struct(col("test")).alias("value")) + + # Convert the DataFrame to Avro format + avro_df = df.select(to_avro(col("value"), schema).alias("value")) + + # Simulate Kafka message structure + kafka_data = ( + avro_df.withColumn("key", lit(b"key1").cast("binary")) + .withColumn("topic", lit("test-topic")) + .withColumn("partition", lit(0)) + .withColumn("offset", lit(1)) + .withColumn("timestamp", lit(1000)) + .withColumn("timestampType", lit(1)) + ) + + # Add magic byte and schema ID to simulate real Kafka Avro messages + kafka_data = kafka_data.withColumn( + "value", F.concat(F.lit(b"\x00\x00\x00\x00\x01"), col("value")) + ) + + # Create the transformer function using the parse_by_avro function + transformer = parse_by_avro("test-topic", mock_schema_registry) + transformed_df = transformer(kafka_data) + + # Check the schema and contents of the transformed DataFrame + transformed_df.show(truncate=False) + assert "test" in transformed_df.columns + assert "__kafka_metadata__" in transformed_df.columns + assert transformed_df.select(col("test")).collect()[0]["test"] == "value1" diff --git a/tests/unit/writer/test_kafka_writer.py b/tests/unit/writer/test_kafka_writer.py index 4b7d597..717de02 100644 --- a/tests/unit/writer/test_kafka_writer.py +++ b/tests/unit/writer/test_kafka_writer.py @@ -6,6 +6,7 @@ from pyspark.sql.functions import floor, rand from pyspark.sql import DataFrame from pyspark.sql import SparkSession +import time @pytest.fixture @@ -23,7 +24,7 @@ def kafka_config() -> dict[str, Any]: "kafka.security.protocol": "PLAINTEXT", }, "checkpoint_location": "./tmp/checkpoint", - "kafka_topic": "test_topic", + "kafka_topic": "test-kafka-writer-topic", "output_mode": "append", "unique_identifier_column_name": "id", "trigger_once": True, @@ -108,6 +109,8 @@ def test_kafka_stream_publisher_write( except Exception as e: pytest.fail(f"KafkaStreamPublisher write failed with exception: {e}") + # Wait to make sure commit file is created + time.sleep(5) checkpoint_dir = kafka_config["checkpoint_location"] commit_file_path = os.path.join(checkpoint_dir, "commits", "0") assert os.path.exists(commit_file_path), "Commit file does not exist"