Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor sparkle example #19

Merged
merged 4 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 26 additions & 25 deletions examples/sparkle/__main__.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,36 @@
import pulumi
from argparse import ArgumentParser, Namespace
from damavand.cloud.provider import AwsProvider
from damavand.factories import SparkControllerFactory

from damavand.cloud.azure.resources import SynapseComponent, SynapseComponentArgs
from applications.orders import CustomerOrders
from examples.sparkle.applications.products import Products


# def main():
# spark_factory = SparkControllerFactory(
# provider=AwsProvider(
# app_name="my-app",
# region="us-west-2",
# ),
# tags={"env": "dev"},
# )
#
# spark = spark_factory.new(name="my-spark")


def main() -> None:
spark = SynapseComponent(
name="my-spark",
args=SynapseComponentArgs(
jobs=[],
sql_admin_username="kiarashk",
sql_admin_password="lkjsf@123",
def main(args: Namespace) -> None:
spark_factory = SparkControllerFactory(
provider=AwsProvider(
app_name="my-app",
region="us-west-2",
),
tags={"env": "dev"},
)

pulumi.export(
"resource_group",
pulumi.Output.all(spark.resource_group).apply(lambda x: x[0].name),
spark_controller = spark_factory.new(
name="my-spark",
)

spark_controller.applications = [
Products(spark_controller.default_session()),
CustomerOrders(spark_controller.default_session()),
]

spark_controller.run_application(args.app_id)


if __name__ == "__main__":
main()
arg_parser = ArgumentParser()
arg_parser.add_argument("--app_id", type=str, required=True)

args = arg_parser.parse_args()

main(args)
Empty file.
34 changes: 34 additions & 0 deletions examples/sparkle/applications/orders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from sparkle.config import Config
from sparkle.writer.iceberg_writer import IcebergWriter
from sparkle.application import Sparkle

from pyspark.sql import DataFrame
from pyspark.sql import SparkSession


class CustomerOrders(Sparkle):
def __init__(self, spark_session: SparkSession):
super().__init__(
spark_session,
config=Config(
app_name="customer-orders",
app_id="customer_orders",
version="0.1",
database_bucket="s3://bucket-name",
kafka=None,
input_database=None,
output_database=None,
iceberg_config=None,
spark_trigger='{"once": True}',
),
writers=[
IcebergWriter(
database_name="default",
database_path="s3://bucket-name/warehouse",
table_name="products",
)
],
)

def process(self) -> DataFrame:
return self.input["orders"].read()
34 changes: 34 additions & 0 deletions examples/sparkle/applications/products.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from sparkle.application import Sparkle
from sparkle.config import Config
from sparkle.writer.iceberg_writer import IcebergWriter

from pyspark.sql import DataFrame
from pyspark.sql import SparkSession


class Products(Sparkle):
def __init__(self, spark_session: SparkSession):
super().__init__(
spark_session,
config=Config(
app_name="products",
app_id="products",
version="0.1",
database_bucket="s3://bucket-name",
kafka=None,
input_database=None,
output_database=None,
iceberg_config=None,
spark_trigger='{"once": True}',
),
writers=[
IcebergWriter(
database_name="default",
database_path="s3://bucket-name/warehouse",
table_name="products",
)
],
)

def process(self) -> DataFrame:
return self.input["products"].read()
14 changes: 3 additions & 11 deletions src/damavand/base/controllers/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,21 @@ class SparkController(ApplicationController):
Return the default cloud Spark session.
default_session()
Return the currently active Spark session.
applications()
Return the list of Spark applications.
application_with_id(app_id)
Return the Spark application with the given ID.
run(app_id)
run_application(app_id)
Run the Spark application with the given ID.
"""

def __init__(
self,
name,
applications: list[Sparkle] = [],
id_: Optional[str] = None,
tags: dict[str, str] = {},
**kwargs,
) -> None:
ApplicationController.__init__(self, name, id_, tags, **kwargs)
self.__applications = applications
self.applications: list[Sparkle]

@property
def _spark_extensions(self) -> list[str]:
Expand Down Expand Up @@ -136,11 +133,6 @@ def default_session(self) -> SparkSession:
case _:
return self.default_cloud_session()

@property
def applications(self) -> list[Sparkle]:
"""Return the list of Spark applications."""
return self.__applications

def application_with_id(self, app_id: str) -> Sparkle:
"""Return the Spark application with the given ID.

Expand All @@ -158,7 +150,7 @@ def application_with_id(self, app_id: str) -> Sparkle:
raise ValueError(f"Application with ID {app_id} not found.")

@runtime
def run(self, app_id: str) -> None:
def run_application(self, app_id: str) -> None:
"""Run the Spark application with the given ID.

Args:
Expand Down
4 changes: 2 additions & 2 deletions src/damavand/cloud/azure/controllers/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ def __init__(
self,
name,
region: str,
applications: list[Sparkle] = [],
id_: Optional[str] = None,
tags: dict[str, str] = {},
**kwargs,
) -> None:
super().__init__(name, applications, id_, tags, **kwargs)
super().__init__(name, id_, tags, **kwargs)
self.applications: list[Sparkle]

@buildtime
def admin_username(self) -> str:
Expand Down
Loading