Skip to content

Commit

Permalink
feat: spark added
Browse files Browse the repository at this point in the history
  • Loading branch information
kkiani committed Aug 8, 2024
1 parent e3854d0 commit 3c7a3c3
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 26 deletions.
4 changes: 4 additions & 0 deletions src/damavand/cloud/aws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from .object_storage import AwsObjectStorageController
from .spark import AwsSparkController, GlueComponent, GlueComponentArgs

all = [
AwsObjectStorageController,
AwsSparkController,
GlueComponent,
GlueComponentArgs,
]
161 changes: 161 additions & 0 deletions src/damavand/cloud/aws/spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import json
import logging
from dataclasses import dataclass
from typing import Optional
from functools import cache

import boto3

import pulumi_aws as aws
from pulumi import Resource as PulumiResource
from pulumi import ComponentResource as PulumiComponentResource
from pulumi import ResourceOptions

# TODO: The following import will be moved to a separated framework
from damavand.sparkle.data_reader import DataReader
from damavand.sparkle.data_writer import DataWriter

from damavand.controllers import SparkController
from damavand.controllers.base_controller import buildtime


logger = logging.getLogger(__name__)


class AwsSparkController(SparkController):
def __init__(
self,
name,
region: str,
reader: DataReader,
writer: DataWriter,
id_: Optional[str] = None,
tags: dict[str, str] = {},
**kwargs,
) -> None:
super().__init__(name, reader, writer, id_, tags, **kwargs)
self.__glue_client = boto3.client("glue", region_name=region)

@buildtime
@cache
def resource(self) -> PulumiResource:
return GlueComponent(
name=self.name,
)


@dataclass
class GlueComponentArgs:
role: Optional[aws.iam.Role] = None
code_repository_bucket: Optional[aws.s3.BucketV2] = None


class GlueComponent(PulumiComponentResource):
@staticmethod
def assume_policy() -> dict:
"""Return the assume role policy for Glue jobs."""

return {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "glue.amazonaws.com",
},
"Action": "sts:AssumeRole",
},
],
}

@staticmethod
def managed_policy_arns() -> list[str]:
"""Return a list of managed policy ARNs that defines the permissions for Glue jobs."""

return [
aws.iam.ManagedPolicy.AWS_GLUE_SERVICE_ROLE,
aws.iam.ManagedPolicy.AMAZON_S3_FULL_ACCESS,
aws.iam.ManagedPolicy.CLOUD_TRAIL_FULL_ACCESS,
]

def __init__(
self,
name: str,
args: GlueComponentArgs = GlueComponentArgs(),
opts: Optional[ResourceOptions] = None,
) -> None:
super().__init__(
f"Damavand:Spark:{GlueComponent.__name__}",
name=f"{name}-glue-component",
props={},
opts=opts,
remote=False,
)

self.args = args
self.code_repository_bucket
self.iceberg_database
self.jobs

@property
@cache
def role(self) -> aws.iam.Role:
"""Return an execution role for Glue jobs."""

return self.args.role or aws.iam.Role(
resource_name=f"{self._name}-role",
opts=ResourceOptions(parent=self),
name=f"{self._name}-ExecutionRole",
assume_role_policy=json.dumps(self.assume_policy()),
managed_policy_arns=self.managed_policy_arns(),
)

@property
@cache
def code_repository_bucket(self) -> aws.s3.BucketV2:
"""Return an S3 bucket for Glue jobs to host source codes."""

# NOTE: using `bucket_prefix` to avoid name conflict as the bucket name must be globally unique.
return self.args.code_repository_bucket or aws.s3.BucketV2(
resource_name=f"{self._name}-code-bucket",
opts=ResourceOptions(parent=self),
bucket_prefix=f"{self._name}-code-bucket",
)

@property
@cache
def iceberg_bucket(self) -> aws.s3.BucketV2:
"""Return an S3 bucket for Iceberg tables to store data processed by Glue jobs."""

# NOTE: using `bucket_prefix` to avoid name conflict as the bucket name must be globally unique.
return aws.s3.BucketV2(
resource_name=f"{self._name}-bucket",
opts=ResourceOptions(parent=self),
bucket_prefix=f"{self._name}-bucket",
)

@property
@cache
def iceberg_database(self) -> aws.glue.CatalogDatabase:
"""Return a Glue database for Iceberg tables to store data processed by Glue jobs."""

return aws.glue.CatalogDatabase(
resource_name=f"{self._name}-database",
opts=ResourceOptions(parent=self),
name=f"{self._name}-database",
location_uri=f"s3://{self.iceberg_bucket.bucket}/",
)

@property
@cache
def jobs(self) -> list[aws.glue.Job]:
"""Return all the Glue jobs for the application."""

return [
aws.glue.Job(
resource_name=f"{self._name}-job",
opts=ResourceOptions(parent=self),
name=f"{self._name}-job",
role_arn=self.role.arn,
)
]
4 changes: 2 additions & 2 deletions src/damavand/controllers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from .base_controller import ApplicationController, runtime, buildtime
from .object_storage import ObjectStorageController
from .spark import SparkApplicationController
from .spark import SparkController

all = [
ApplicationController,
ObjectStorageController,
SparkApplicationController,
SparkController,
runtime,
buildtime,
]
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,31 @@
from pyspark.sql import SparkSession

from damavand.environment import Environment
from damavand.resource import BaseResource
from damavand.resource.resource import buildtime, runtime
from damavand.controllers import ApplicationController
from damavand.controllers.base_controller import runtime

# TODO: The following import will be moved to a separated framework
from damavand.sparkle.models import Trigger
from sparkle.core import Sparkle
from sparkle.data_reader import DataReader
from sparkle.data_writer import DataWriter
from damavand.sparkle.core import Sparkle
from damavand.sparkle.data_reader import DataReader
from damavand.sparkle.data_writer import DataWriter


logger = logging.getLogger(__name__)


class BaseSpark(BaseResource, Sparkle):
class SparkController(ApplicationController, Sparkle):
def __init__(
self,
name,
data_reader: DataReader,
data_writer: DataWriter,
reader: DataReader,
writer: DataWriter,
id_: Optional[str] = None,
tags: dict[str, str] = {},
**kwargs,
) -> None:
BaseResource.__init__(self, name, id_, tags, **kwargs)
Sparkle.__init__(self, reader=data_reader, writer=data_writer)
ApplicationController.__init__(self, name, id_, tags, **kwargs)
Sparkle.__init__(self, reader=reader, writer=writer)

@property
def _spark_extensions(self) -> list[str]:
Expand Down Expand Up @@ -96,10 +96,6 @@ def default_session(self) -> SparkSession:
case _:
return self.default_cloud_session

@buildtime
def provision(self):
raise NotImplementedError

def _get_spark_session(self, env: Environment) -> SparkSession:
if env == Environment.LOCAL:
raise NotImplementedError
Expand All @@ -108,6 +104,8 @@ def _get_spark_session(self, env: Environment) -> SparkSession:

@runtime
def run(self, trigger: Trigger, session: Optional[SparkSession] = None) -> None:
"""Run the Spark application with the given trigger and session."""

if session:
Sparkle.run(self, trigger, session)
else:
Expand Down
42 changes: 34 additions & 8 deletions src/damavand/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,16 @@
from rich.console import Console

from damavand import utils
from damavand.controllers import ApplicationController, ObjectStorageController
from damavand.cloud.provider import CloudProvider, AzurermProvider, AwsProvider
from damavand.cloud.aws import AwsObjectStorageController
from damavand.cloud.aws import AwsObjectStorageController, AwsSparkController
from damavand.controllers import (
ApplicationController,
ObjectStorageController,
SparkController,
)

from damavand.sparkle.data_reader import DataReader
from damavand.sparkle.data_writer import DataWriter


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -51,29 +58,48 @@ def provision_all_resources(self) -> None:
_ = controller.resource()

def new_object_storage(
self, name: str, tags: dict, **kwargs
self,
name: str,
tags: dict,
**kwargs,
) -> ObjectStorageController:
"""Create a new object storage."""
match self.provider:
case AwsProvider():
resource = AwsObjectStorageController(
controller = AwsObjectStorageController(
name,
region=self.provider.enforced_region,
tags={**self.all_tags, **tags},
**kwargs,
)
self._controllers.append(resource)
return resource
self._controllers.append(controller)
return controller
case AzurermProvider():
raise NotImplementedError("Azure bucket is not implemented yet")
case _:
raise Exception("Unknown provider")

def new_spark(self, name: str, tags: dict, **kwargs) -> ApplicationController:
def new_spark(
self,
name: str,
tags: dict,
reader: DataReader,
writer: DataWriter,
**kwargs,
) -> SparkController:
"""Create a new Spark ETL Application."""
match self.provider:
case AwsProvider():
raise NotImplementedError("Spark ETL is not implemented yet for AWS")
controller = AwsSparkController(
name,
region=self.provider.enforced_region,
reader=reader,
writer=writer,
tags={**self.all_tags, **tags},
**kwargs,
)
self._controllers.append(controller)
return controller
case AzurermProvider():
raise NotImplementedError("Spark ETL is not implemented yet for Azure")
case _:
Expand Down
1 change: 0 additions & 1 deletion src/damavand/sparkle/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def parsed_args(self) -> argparse.Namespace:
"--not-delete-before-reprocess",
help="By default, reprocess request deletes the table. If you pass this parameter, it will NOT delete the table.",
action="store_false",
type=bool,
)
parser.add_argument(
"--options",
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from _pytest.monkeypatch import MonkeyPatch

from damavand.resource import buildtime, runtime
from damavand.controllers import buildtime, runtime


def test_buildtime_decorator(monkeypatch: MonkeyPatch):
Expand Down

0 comments on commit 3c7a3c3

Please sign in to comment.