Skip to content

Commit

Permalink
Merge pull request #19 from DataChefHQ/feature/14-add-kafka-batch-pub…
Browse files Browse the repository at this point in the history
…lisher

feat: add KafkaBatchPublisher
  • Loading branch information
farbodahm authored Oct 3, 2024
2 parents 36a2c91 + f992d87 commit 91901f7
Show file tree
Hide file tree
Showing 10 changed files with 355 additions and 88 deletions.
6 changes: 5 additions & 1 deletion devenv.nix
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ let
black
pylint
];
compose-path = "./tests/docker-compose.yml";
in
{
name = "sparkle";
Expand Down Expand Up @@ -50,6 +51,9 @@ in
scripts.down.exec = "devenv processes down";
scripts.down.description = "Stop processes.";

scripts.cleanup.exec = "docker compose -f ${compose-path} rm -vf";
scripts.cleanup.description = "Remove unused docker containers and volumes.";

scripts.show.exec = ''
GREEN="\033[0;32m";
YELLOW="\033[33m";
Expand Down Expand Up @@ -103,7 +107,7 @@ in

processes = {
kafka-test.exec = ''
docker compose -f tests/docker-compose.yml up --build
docker compose -f ${compose-path} up --build
'';
};

Expand Down
33 changes: 32 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pytest-mock = "^3.14.0"
confluent-kafka = "^2.5.3"
fastavro = "^1.9.7"
types-confluent-kafka = "^1.2.2"
chispa = "^0.10.1"

[tool.commitizen]
version = "0.5.1"
Expand Down Expand Up @@ -71,7 +72,20 @@ source = [
skip_empty = true

[tool.pytest.ini_options]
addopts = "-vv --tb=auto --disable-warnings"
addopts = "-v --tb=short -ra --no-header --show-capture=log"
# -v: add sufficient verbosity without being overwhelming
# --tb=short: show failing line and related context without printing all function code
# -ra: small recap at the end of pytest outputs excluding passed tests
# --no-header: skip pytest header
# --show-capture=log: reduce output clutter by capturing only logging calls
log_level = "info"
pythonpath = [
"src"
]
markers = [
"wip: mark tests as work in progress",
]

[[tool.mypy.overrides]]
module = "chispa.*"
ignore_missing_imports = true
96 changes: 92 additions & 4 deletions src/sparkle/writer/kafka_writer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Any
from pyspark.sql import SparkSession, DataFrame

from pyspark.sql import DataFrame, SparkSession

from sparkle.config import Config
from sparkle.utils.spark import to_kafka_dataframe
from sparkle.writer import Writer
Expand Down Expand Up @@ -51,9 +53,7 @@ def __init__(
self.trigger_once = trigger_once

@classmethod
def with_config(
cls, config: Config, spark: SparkSession, **kwargs: Any
) -> "KafkaStreamPublisher":
def with_config(cls, config: Config, spark: SparkSession, **kwargs: Any) -> "KafkaStreamPublisher":
"""Create a KafkaStreamPublisher object with a configuration.
Args:
Expand Down Expand Up @@ -109,3 +109,91 @@ def write(self, df: DataFrame) -> None:
.option("topic", self.kafka_topic)
.start()
)


class KafkaBatchPublisher(Writer):
"""KafkaBatchublisher class for writing DataFrames in batch to Kafka.
Inherits from the Writer abstract base class and implements the write
method for writing data to Kafka topics.
Args:
kafka_options (dict[str, Any]): Kafka connection options.
kafka_topic (str): Kafka topic to which data will be written.
unique_identifier_column_name (str): Column name used as the Kafka key.
spark (SparkSession): Spark session instance to use.
"""

def __init__(
self,
kafka_options: dict[str, Any],
kafka_topic: str,
unique_identifier_column_name: str,
spark: SparkSession,
) -> None:
"""Initialize the KafkaBatchPublisher object.
Args:
kafka_options (dict[str, Any]): Kafka options for the connection.
kafka_topic (str): The target Kafka topic for writing data.
unique_identifier_column_name (str): Column name to be used as Kafka key.
spark (SparkSession): The Spark session to be used for writing data.
"""
super().__init__(spark)
self.kafka_options = kafka_options
self.kafka_topic = kafka_topic
self.unique_identifier_column_name = unique_identifier_column_name

@classmethod
def with_config(cls, config: Config, spark: SparkSession, **kwargs: Any) -> "KafkaBatchPublisher":
"""Create a KafkaBatchPublisher object with a configuration.
Args:
config (Config): Configuration object containing settings for the writer.
spark (SparkSession): The Spark session to be used for writing data.
**kwargs (Any): Additional keyword arguments.
Returns:
KafkaBatchPublisher: An instance configured with the provided settings.
"""
if not config.kafka_output:
raise ValueError("Kafka output configuration is missing")

c = config.kafka_output

return cls(
kafka_options=c.kafka_config.spark_kafka_config,
kafka_topic=c.kafka_topic,
unique_identifier_column_name=c.unique_identifier_column_name,
spark=spark,
)

def write(self, df: DataFrame) -> None:
"""Write DataFrame to Kafka by converting it to JSON using the configured primary key.
This method transforms the DataFrame using the unique identifier column name
and writes it to the configured Kafka topic.
Args:
df (DataFrame): The DataFrame to be written.
Raises:
KeyError: If the DataFrame does not have the required 'key' and 'value' columns.
"""
# Convert the DataFrame to a Kafka-friendly format
kafka_df = to_kafka_dataframe(self.unique_identifier_column_name, df)

if set(kafka_df.columns) != {"key", "value"}:
raise KeyError(
"The DataFrame must contain 'key' and 'value' columns. "
"Ensure that `to_kafka_dataframe` transformation is correctly applied."
)

# fmt: off
(
kafka_df.write.format("kafka")
.options(**self.kafka_options)
.option("topic", self.kafka_topic)
.save()
)
# fmt: on
86 changes: 64 additions & 22 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import pytest
from typing import Any
import io
import json
import logging
import os
from pyspark.sql import SparkSession
import shutil
from contextlib import redirect_stdout
from typing import Any

import pytest
from pyspark.conf import SparkConf
from pyspark.sql import DataFrame, SparkSession


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -45,18 +50,51 @@ def spark_session() -> SparkSession:
for key, value in LOCAL_CONFIG.items():
spark_conf.set(key, str(value))

spark_session = (
SparkSession.builder.master("local[*]")
.appName("LocalTestSparkleApp")
.config(conf=spark_conf)
)
spark_session = SparkSession.builder.master("local[*]").appName("LocalTestSparkleApp").config(conf=spark_conf)

if ivy_settings_path:
spark_session.config("spark.jars.ivySettings", ivy_settings_path)

return spark_session.getOrCreate()


@pytest.fixture(scope="session")
def checkpoint_directory():
"""Fixture to validate and remove the checkpoint directory after tests.
To avoid test failures due to non-unique directories, the user should add a
subdirectory to this path when using this fixture.
Example:
>>> dir = checkpoint_directory + subdir
"""
checkpoint_dir = "/tmp/checkpoint/"

yield checkpoint_dir

# Remove the checkpoint directory if it exists
if os.path.exists(checkpoint_dir):
shutil.rmtree(checkpoint_dir)
logging.info(f"Checkpoint directory {checkpoint_dir} has been removed.")
else:
logging.warning(f"Checkpoint directory {checkpoint_dir} was not found.")


@pytest.fixture(scope="session", autouse=True)
def cleanup_logging_handlers():
"""Fixture to cleanup logging handlers after tests.
Prevents logging errors at the end of the report.
Taken from [here](https://github.com/pytest-dev/pytest/issues/5502#issuecomment-1803676152)
"""
try:
yield
finally:
for handler in logging.root.handlers[:]:
if isinstance(handler, logging.StreamHandler):
logging.root.removeHandler(handler)


@pytest.fixture
def user_dataframe(spark_session: SparkSession):
"""Fixture for creating a DataFrame with user data.
Expand All @@ -71,21 +109,11 @@ def user_dataframe(spark_session: SparkSession):
pyspark.sql.DataFrame: A Spark DataFrame with sample user data.
"""
data = [
{
"name": "John",
"surname": "Doe",
"phone": "12345",
"email": "[email protected]",
},
{
"name": "Jane",
"surname": "Doe",
"phone": "12345",
"email": "[email protected]",
},
["John", "Doe", "12345", "[email protected]"],
["Jane", "Doe", "12345", "[email protected]"],
]

return spark_session.createDataFrame(data)
schema = ["name", "surname", "phone", "email"]
return spark_session.createDataFrame(data, schema=schema)


@pytest.fixture
Expand Down Expand Up @@ -127,3 +155,17 @@ def json_to_string(dictionary: dict[str, Any]) -> str:
ensure_ascii=True,
separators=(",", ":"),
).replace("\n", "")


def log_spark_dataframe(df: DataFrame, *, truncate: bool = False, name: str = "") -> None:
"""Logs the contents of a Spark DataFrame in tabular format.
Useful when Pytest is configured to capture only logs, so `df.show()` won't work.
Example:
>>> log_spark_dataframe(df, name="My DataFrame")
"""
buffer = io.StringIO()
with redirect_stdout(buffer):
df.show(truncate=truncate)
logging.info(f"\n{name}\n{buffer.getvalue()}")
11 changes: 11 additions & 0 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,14 @@ services:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
environment:
DYNAMIC_CONFIG_ENABLED: true
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:29092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
Loading

0 comments on commit 91901f7

Please sign in to comment.