diff --git a/src/sparkle/application/__init__.py b/src/sparkle/application/__init__.py index 336a567..c028426 100644 --- a/src/sparkle/application/__init__.py +++ b/src/sparkle/application/__init__.py @@ -23,7 +23,7 @@ def __init__( self, config: Config, readers: dict[str, type[Reader]], - writers: list[Writer], + writers: list[type[Writer]], spark_extensions: list[str] | None = None, spark_packages: list[str] | None = None, extra_spark_config: dict[str, str] | None = None, @@ -33,14 +33,13 @@ def __init__( Args: config (Config): The configuration object containing application-specific settings. readers (dict[str, type[Reader]]): A dictionary of readers for input data, keyed by source name. - writers (list[Writer]): A list of Writer objects used to output processed data. + writers (list[type[Writer]]): A list of Writer objects used to output processed data. spark_extensions (list[str], optional): A list of Spark session extensions to apply. spark_packages (list[str], optional): A list of Spark packages to include in the session. extra_spark_config (dict[str, str], optional): Additional Spark configurations to merge with the default settings. """ self.config = config - self.writers = writers self.readers = readers self.execution_env = config.execution_environment self.spark_config = config.get_spark_config( @@ -50,6 +49,10 @@ def __init__( self.spark_packages = config.get_spark_packages(spark_packages) self.spark_session = self.get_spark_session(self.execution_env) + self.writers: list[Writer] = [ + writer_class.with_config(self.config, self.spark_session) + for writer_class in writers + ] def get_spark_session(self, env: ExecutionEnvironment) -> SparkSession: """Creates and returns a Spark session based on the environment. diff --git a/src/sparkle/reader/kafka_reader.py b/src/sparkle/reader/kafka_reader.py index 97e5515..ba57d9c 100644 --- a/src/sparkle/reader/kafka_reader.py +++ b/src/sparkle/reader/kafka_reader.py @@ -4,9 +4,10 @@ from sparkle.config.kafka_config import SchemaFormat from sparkle.reader.schema_registry import SchemaRegistry from sparkle.utils.spark import parse_by_avro +from sparkle.reader import Reader -class KafkaReader: +class KafkaReader(Reader): """KafkaReader is a reader for streaming data from Kafka using Spark. This class allows you to read data from a specified Kafka topic, with support diff --git a/src/sparkle/reader/table_reader.py b/src/sparkle/reader/table_reader.py index f625020..2c080e1 100644 --- a/src/sparkle/reader/table_reader.py +++ b/src/sparkle/reader/table_reader.py @@ -1,9 +1,10 @@ from pyspark.sql import SparkSession, DataFrame from sparkle.config import Config from sparkle.utils.logger import logger +from sparkle.reader import Reader -class TableReader: +class TableReader(Reader): """A class for reading tables from a specified catalog using Spark. The `TableReader` class provides methods to read data from a table in a specified @@ -44,7 +45,7 @@ def __init__( @classmethod def with_config( - cls, spark: SparkSession, config: Config, **kwargs + cls, config: Config, spark: SparkSession, **kwargs ) -> "TableReader": """Creates a TableReader instance using a configuration object.