Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/writers interface #23

Merged
merged 2 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/sparkle/application/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(
self,
config: Config,
readers: dict[str, type[Reader]],
writers: list[Writer],
writers: list[type[Writer]],
spark_extensions: list[str] | None = None,
spark_packages: list[str] | None = None,
extra_spark_config: dict[str, str] | None = None,
Expand All @@ -33,14 +33,13 @@ def __init__(
Args:
config (Config): The configuration object containing application-specific settings.
readers (dict[str, type[Reader]]): A dictionary of readers for input data, keyed by source name.
writers (list[Writer]): A list of Writer objects used to output processed data.
writers (list[type[Writer]]): A list of Writer objects used to output processed data.
spark_extensions (list[str], optional): A list of Spark session extensions to apply.
spark_packages (list[str], optional): A list of Spark packages to include in the session.
extra_spark_config (dict[str, str], optional): Additional Spark configurations to
merge with the default settings.
"""
self.config = config
self.writers = writers
self.readers = readers
self.execution_env = config.execution_environment
self.spark_config = config.get_spark_config(
Expand All @@ -50,6 +49,10 @@ def __init__(
self.spark_packages = config.get_spark_packages(spark_packages)

self.spark_session = self.get_spark_session(self.execution_env)
self.writers: list[Writer] = [
writer_class.with_config(self.config, self.spark_session)
for writer_class in writers
]

def get_spark_session(self, env: ExecutionEnvironment) -> SparkSession:
"""Creates and returns a Spark session based on the environment.
Expand Down
3 changes: 2 additions & 1 deletion src/sparkle/reader/kafka_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
from sparkle.config.kafka_config import SchemaFormat
from sparkle.reader.schema_registry import SchemaRegistry
from sparkle.utils.spark import parse_by_avro
from sparkle.reader import Reader


class KafkaReader:
class KafkaReader(Reader):
"""KafkaReader is a reader for streaming data from Kafka using Spark.

This class allows you to read data from a specified Kafka topic, with support
Expand Down
5 changes: 3 additions & 2 deletions src/sparkle/reader/table_reader.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from pyspark.sql import SparkSession, DataFrame
from sparkle.config import Config
from sparkle.utils.logger import logger
from sparkle.reader import Reader


class TableReader:
class TableReader(Reader):
"""A class for reading tables from a specified catalog using Spark.

The `TableReader` class provides methods to read data from a table in a specified
Expand Down Expand Up @@ -44,7 +45,7 @@ def __init__(

@classmethod
def with_config(
cls, spark: SparkSession, config: Config, **kwargs
cls, config: Config, spark: SparkSession, **kwargs
) -> "TableReader":
"""Creates a TableReader instance using a configuration object.

Expand Down
Loading