Skip to content

Commit

Permalink
feat: add Kafka Avro reader
Browse files Browse the repository at this point in the history
  • Loading branch information
farbodahm committed Sep 11, 2024
1 parent 028a19f commit fb1736b
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 13 deletions.
14 changes: 13 additions & 1 deletion pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dev = [
"pytest-mock>=3.14.0",
"confluent-kafka>=2.5.3",
"fastavro>=1.9.7",
"types-confluent-kafka>=1.2.2",
]
[tool.commitizen]
version = "1.0.0"
Expand Down
3 changes: 2 additions & 1 deletion src/sparkle/reader/kafka_reader.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Any
from pyspark.sql import SparkSession, DataFrame
from sparkle.config import Config
from sparkle.reader.schema_registry import SchemaRegistry
Expand Down Expand Up @@ -25,7 +26,7 @@ def __init__(
schema_registry: SchemaRegistry,
use_avro: bool = True,
schema_version: str = "latest",
kafka_spark_options: dict[str, str] = {},
kafka_spark_options: dict[str, Any] = {},
):
"""Initializes KafkaReader with configuration, Spark session, topic, and schema registry.
Expand Down
15 changes: 5 additions & 10 deletions tests/unit/reader/test_kafka_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections.abc import Generator
import pytest
from pyspark.sql import SparkSession, DataFrame
from confluent_kafka import Producer, KafkaException
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
from confluent_kafka.schema_registry.avro import AvroSerializer
Expand All @@ -29,14 +29,9 @@ def kafka_setup() -> Generator[str, None, None]:
"""
admin_client = AdminClient({"bootstrap.servers": KAFKA_BROKER_URL})

# Create topic if it does not exist
try:
admin_client.create_topics(
[NewTopic(TEST_TOPIC, num_partitions=1, replication_factor=1)]
)
except KafkaException as e:
if e.args[0].code() != KafkaException.TOPIC_ALREADY_EXISTS:
raise e
admin_client.create_topics(
[NewTopic(TEST_TOPIC, num_partitions=1, replication_factor=1)]
)

yield TEST_TOPIC

Expand Down Expand Up @@ -88,7 +83,7 @@ def avro_serializer(schema_registry_client: SchemaRegistryClient) -> AvroSeriali
schema = Schema(schema_str, schema_type="AVRO")
schema_registry_client.register_schema(f"{TEST_TOPIC}-value", schema)

return AvroSerializer(schema_registry_client, schema_str, lambda obj, ctx: obj)
return AvroSerializer(schema_registry_client, schema_str)


@pytest.fixture
Expand Down
5 changes: 4 additions & 1 deletion tests/unit/writer/test_kafka_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pyspark.sql.functions import floor, rand
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
import time


@pytest.fixture
Expand All @@ -23,7 +24,7 @@ def kafka_config() -> dict[str, Any]:
"kafka.security.protocol": "PLAINTEXT",
},
"checkpoint_location": "./tmp/checkpoint",
"kafka_topic": "test_topic",
"kafka_topic": "test-kafka-writer-topic",
"output_mode": "append",
"unique_identifier_column_name": "id",
"trigger_once": True,
Expand Down Expand Up @@ -108,6 +109,8 @@ def test_kafka_stream_publisher_write(
except Exception as e:
pytest.fail(f"KafkaStreamPublisher write failed with exception: {e}")

# Wait to make sure commit file is created
time.sleep(5)
checkpoint_dir = kafka_config["checkpoint_location"]
commit_file_path = os.path.join(checkpoint_dir, "commits", "0")
assert os.path.exists(commit_file_path), "Commit file does not exist"

0 comments on commit fb1736b

Please sign in to comment.