Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/kafka reader #16

Merged
merged 12 commits into from
Sep 16, 2024
38 changes: 32 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,45 @@ on:
- main

jobs:
tests:
strategy:
fail-fast: false # Prevents the matrix from failing fast
matrix:
os: [ubuntu-latest, macos-latest]
runs-on: ${{ matrix.os }}
# Mac tests and Ubuntu tests are separated so that Ubuntu tests can
# run on both PRs and main; and Mac tests only on main branch.
ubuntu_tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- uses: cachix/install-nix-action@v26
- uses: cachix/cachix-action@v14
with:
name: devenv

- name: Install devenv.sh
run: nix profile install nixpkgs#devenv

- name: Build the devenv shell and run any pre-commit hooks
run: devenv test

macos_tests:
if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }}
runs-on: macos-latest
steps:
- uses: actions/checkout@v4

- name: Setup Docker (macOS) with Colima
run: |
brew install docker docker-compose colima
colima delete
colima start --arch x86_64
echo $SHELL
sudo ln -sf $HOME/.colima/default/docker.sock /var/run/docker.sock
docker --version
curl -s --unix-socket $HOME/.colima/default/docker.sock http/_ping

- uses: cachix/install-nix-action@v26
- uses: cachix/cachix-action@v14
with:
name: devenv

- name: Install devenv.sh
run: nix profile install nixpkgs#devenv

Expand Down
25 changes: 25 additions & 0 deletions .test.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,30 @@
#!/usr/bin/env bash
set -euo pipefail

SERVICE_NAME="schema-registry"

# Function to check if the container is healthy
check_health() {
STATUS=$(docker inspect --format='{{.State.Health.Status}}' schema-registry 2>/dev/null)

if [ "$STATUS" == "healthy" ]; then
echo "Container 'schema-registry' is healthy."
return 0
else
echo "Container 'schema-registry' is not healthy. Current status: $STATUS"
return 1
fi
}

# Loop until the container is healthy
while true; do
if check_health; then
break
else
echo "Retrying in 5 seconds..."
sleep 5
fi
done

python -m mypy .
python -m pytest -vv
4 changes: 3 additions & 1 deletion devenv.nix
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ in
languages.java.jdk.package = pkgs.jdk8; # Java version running on AWS Glue

processes = {
kafka-test.exec = "docker compose -f tests/docker-compose.yml up --build";
kafka-test.exec = ''
docker compose -f tests/docker-compose.yml up --build
'';
};

enterShell = ''
Expand Down
208 changes: 206 additions & 2 deletions poetry.lock

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ packages = [
]

[tool.poetry.dependencies]
python = ">=3.10.14"
python = ">=3.10.14, <4.0"
# FIXME we need to lock this for now, given the plugins we install are
# tightly coupled with the versions. Unless we require users to add
# these plugins on runtime, we can't change this.
pyspark = "3.3.2"
requests = "^2.32.3"
responses = "^0.25.3"

[tool.poetry.group.dev.dependencies]
commitizen = "^3.29.0"
Expand All @@ -26,6 +28,10 @@ mypy = "^1.11.0"
pytest-coverage = "^0.0"
pytest-dependency = "^0.6.0"
pytest = "^8.3.1"
pytest-mock = "^3.14.0"
confluent-kafka = "^2.5.3"
fastavro = "^1.9.7"
types-confluent-kafka = "^1.2.2"

[tool.commitizen]
version = "1.0.0"
Expand Down
15 changes: 15 additions & 0 deletions src/sparkle/config/kafka_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
from enum import Enum
from dataclasses import dataclass


class SchemaFormat(Enum):
"""Enumeration for different schema types.

Attributes:
RAW (str): Raw schema type.
AVRO (str): Avro schema type.
"""

raw = "raw"
avro = "avro"


@dataclass(frozen=True)
class Credentials:
"""Credentials for external services.
Expand All @@ -21,10 +34,12 @@ class SchemaRegistryConfig:
Attributes:
url (str): URL of the Schema Registry.
credentials (Credentials): Credentials for accessing the Schema Registry.
schema_format (SchemaFormat): Format of schema to use with the Schema Registry.
"""

url: str
credentials: Credentials
schema_format: SchemaFormat


@dataclass(frozen=True)
Expand Down
125 changes: 125 additions & 0 deletions src/sparkle/reader/kafka_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from typing import Any
from pyspark.sql import SparkSession, DataFrame
from sparkle.config import Config
from sparkle.config.kafka_config import SchemaFormat
from sparkle.reader.schema_registry import SchemaRegistry
from sparkle.utils.spark import parse_by_avro


class KafkaReader:
"""KafkaReader is a reader for streaming data from Kafka using Spark.

This class allows you to read data from a specified Kafka topic, with support
for Avro format parsing using a schema registry.

Attributes:
spark (SparkSession): Spark session to be used for reading data.
topic (str): Kafka topic to read from.
schema_registry (SchemaRegistry): Schema registry client for fetching Avro schemas.
schema_version (str): Version of the schema to use for Avro parsing.
format_ (SchemaFormat): The format of the schema (e.g., Avro) used for parsing data.
kafka_options (Dict[str, Any]): Dictionary containing Kafka configuration options for Spark.
"""

def __init__(
self,
spark: SparkSession,
topic: str,
schema_registry: SchemaRegistry,
format_: SchemaFormat = SchemaFormat.avro,
schema_version: str = "latest",
kafka_spark_options: dict[str, Any] = {},
):
"""Initializes KafkaReader with configuration, Spark session, topic, and schema registry.

Args:
spark (SparkSession): Spark session to be used for reading data.
topic (str): Kafka topic to read from.
schema_registry (SchemaRegistry): Schema registry client for fetching Avro schemas.
format_ (SchemaFormat, optional): The format of the schema (e.g., Avro) used for parsing data.
Defaults to SchemaFormat.avro.
schema_version (str, optional): Schema version to use for reading data. Defaults to "latest".
kafka_spark_options (Dict[str, Any], optional): Dictionary containing Kafka configuration options
for Spark. Defaults to an empty dictionary.
"""
self.spark = spark
self.topic = topic
self.schema_registry = schema_registry
self.schema_version = schema_version
self.format_ = format_
self.kafka_options = kafka_spark_options

@classmethod
def with_config(
cls, config: Config, spark: SparkSession, **kwargs
) -> "KafkaReader":
"""Creates a KafkaReader instance with specific configuration.

Args:
config (Config): Configuration object containing Kafka settings.
spark (SparkSession): Spark session to be used for reading data.
**kwargs: Additional keyword arguments, such as topic, schema registry, and schema version.

Returns:
KafkaReader: An instance of KafkaReader configured with the provided settings.

Raises:
ValueError: If Kafka input configuration is missing in the provided config.
"""
if not config.kafka_input:
raise ValueError("Kafka input configuration is missing.")

schema_registry = SchemaRegistry.with_config(config)

return cls(
spark=spark,
topic=config.kafka_input.kafka_topic,
schema_registry=schema_registry,
**kwargs
)

def read_raw(self) -> DataFrame:
"""Reads raw data from the Kafka topic as a Spark DataFrame.

This method connects to the Kafka topic and reads data as a raw Spark
DataFrame without applying any format-specific parsing.

Returns:
DataFrame: A Spark DataFrame containing the raw data from the Kafka topic.
"""
df = (
self.spark.readStream.format("kafka")
.option("subscribe", self.topic)
.options(**self.kafka_options)
.load()
)
return df

def read(self) -> DataFrame:
"""Reads data from the Kafka topic, optionally parsing it using the specified format.

Returns:
DataFrame: A Spark DataFrame containing the data read from the Kafka topic.
"""
if self.format_ == SchemaFormat.avro:
return self.read_avro()
return self.read_raw()

def read_avro(self) -> DataFrame:
"""Reads Avro data from the Kafka topic and parses it using the schema registry.

Returns:
DataFrame: A Spark DataFrame containing the parsed Avro data.

Raises:
ValueError: If the topic name contains '*' or ',' characters, which are not allowed.
"""
if "*" in self.topic or "," in self.topic:
raise ValueError(
"Topic name cannot contain '*' or ',' characters. Use read_multiple method for multiple topics."
)

self.schema_registry.fetch_schema(self.topic, self.schema_version)
return self.read_raw().transform(
parse_by_avro(self.topic, self.schema_registry)
)
Loading
Loading