Skip to content

Commit

Permalink
chore(sparkle): update example to factory controller
Browse files Browse the repository at this point in the history
  • Loading branch information
kkiani committed Sep 10, 2024
1 parent c81e06a commit 8d5c91c
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 25 deletions.
51 changes: 26 additions & 25 deletions examples/sparkle/__main__.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,36 @@
import pulumi
from argparse import ArgumentParser, Namespace
from damavand.cloud.provider import AwsProvider
from damavand.factories import SparkControllerFactory

from damavand.cloud.azure.resources import SynapseComponent, SynapseComponentArgs
from applications.orders import CustomerOrders
from examples.sparkle.applications.products import Products


# def main():
# spark_factory = SparkControllerFactory(
# provider=AwsProvider(
# app_name="my-app",
# region="us-west-2",
# ),
# tags={"env": "dev"},
# )
#
# spark = spark_factory.new(name="my-spark")


def main() -> None:
spark = SynapseComponent(
name="my-spark",
args=SynapseComponentArgs(
jobs=[],
sql_admin_username="kiarashk",
sql_admin_password="lkjsf@123",
def main(args: Namespace) -> None:
spark_factory = SparkControllerFactory(
provider=AwsProvider(
app_name="my-app",
region="us-west-2",
),
tags={"env": "dev"},
)

pulumi.export(
"resource_group",
pulumi.Output.all(spark.resource_group).apply(lambda x: x[0].name),
spark_controller = spark_factory.new(
name="my-spark",
)

spark_controller.applications = [
Products(spark_controller.default_session()),
CustomerOrders(spark_controller.default_session()),
]

spark_controller.run_application(id_=args.app_id)


if __name__ == "__main__":
main()
arg_parser = ArgumentParser()
arg_parser.add_argument("--app_id", type=str, required=True)

args = arg_parser.parse_args()

main(args)
Empty file.
34 changes: 34 additions & 0 deletions examples/sparkle/applications/orders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from sparkle.config import Config
from sparkle.writer.iceberg_writer import IcebergWriter
from sparkle.application import Sparkle

from pyspark.sql import DataFrame
from pyspark.sql import SparkSession


class CustomerOrders(Sparkle):
def __init__(self, spark_session: SparkSession):
super().__init__(
spark_session,
config=Config(
app_name="customer-orders",
app_id="customer_orders",
version="0.1",
database_bucket="s3://bucket-name",
kafka=None,
input_database=None,
output_database=None,
iceberg_config=None,
spark_trigger='{"once": True}',
),
writers=[
IcebergWriter(
database_name="default",
database_path="s3://bucket-name/warehouse",
table_name="products",
)
],
)

def process(self) -> DataFrame:
return self.input["orders"].read()
34 changes: 34 additions & 0 deletions examples/sparkle/applications/products.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from sparkle.application import Sparkle
from sparkle.config import Config
from sparkle.writer.iceberg_writer import IcebergWriter

from pyspark.sql import DataFrame
from pyspark.sql import SparkSession


class Products(Sparkle):
def __init__(self, spark_session: SparkSession):
super().__init__(
spark_session,
config=Config(
app_name="products",
app_id="products",
version="0.1",
database_bucket="s3://bucket-name",
kafka=None,
input_database=None,
output_database=None,
iceberg_config=None,
spark_trigger='{"once": True}',
),
writers=[
IcebergWriter(
database_name="default",
database_path="s3://bucket-name/warehouse",
table_name="products",
)
],
)

def process(self) -> DataFrame:
return self.input["products"].read()

0 comments on commit 8d5c91c

Please sign in to comment.