Skip to content

Commit

Permalink
added databricks README.md
Browse files Browse the repository at this point in the history
  • Loading branch information
kundan59 committed Jan 13, 2024
1 parent 89f9e75 commit 6135496
Show file tree
Hide file tree
Showing 16 changed files with 256 additions and 64 deletions.
1 change: 1 addition & 0 deletions .github/workflows/car-demo-pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ jobs:
permissions:
contents: read
packages: write
id-token: write
steps:
- uses: actions/checkout@v3
- name: Set up JDK 19
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ To verify the deployment of Kubernetes on Google Cloud Platform (GCP), follow th
Examine logs for applications and services to identify any issues or error messages.
```

#### Data platform
For the analytical purpose, the insightful data generated by microservices brings to the databricks
Lakehouse platform.
find the details here [README.md](databricks%2FREADME.md)

Documentation
--
- Demo session [available](documentation/automobile ecommerce-platform.pptx)
Expand Down
23 changes: 9 additions & 14 deletions apps-deployment-script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ build_and_deploy_service(){
SERVICE_NAME=$1
CLUSTER_NAME=$2
DEPLOYMENT_NAME=$3
VERSION=$4
echo "---------build and deploy $SERVICE_NAME-----------"
cd "$SERVICE_NAME" || exit
if [ $SERVICE_NAME != "car-ui" ]; then
Expand All @@ -29,8 +28,7 @@ build_and_deploy_service(){
chmod u+x ./kustomize

# set docker image for kustomize
# shellcheck disable=SC2140
./kustomize edit set image gcr.io/PROJECT_ID/IMAGE:TAG=gcr.io/"$PROJECT_ID"/"$SERVICE_NAME":"${GITHUB_SHA}_${VERSION}"
./kustomize edit set image gcr.io/PROJECT_ID/IMAGE:TAG=gcr.io/"$PROJECT_ID"/"$SERVICE_NAME":"$GITHUB_SHA"
# deploy through kubectl
./kustomize build . | kubectl apply -f -
kubectl rollout status deployment/"$DEPLOYMENT_NAME"
Expand All @@ -39,12 +37,9 @@ build_and_deploy_service(){
}


for STR in $(cat projects-changes-deploy.txt)

for project in $(cat projects-changes-deploy.txt)
do
:
project=$(echo "$STR" | cut -f1 -d-)
version=$(echo "$STR" | cut -f2 -d-)
case $project in
# case 1 build and deploy package common
"common")
Expand All @@ -61,37 +56,37 @@ do

# case 3 build and deploy inventory-service
"inventory-service")
build_and_deploy_service inventory-service $GKE_CLUSTER inventoryservice "$version"
build_and_deploy_service inventory-service $GKE_CLUSTER inventoryservice
cd ..;;

# case 4 build and deploy payment-service
"payment-service")
build_and_deploy_service payment-service $GKE_CLUSTER paymentservice "$version"
build_and_deploy_service payment-service $GKE_CLUSTER paymentservice
cd ..;;

# case 5 build and deploy shipment-service
"shipment-service")
build_and_deploy_service shipment-service $GKE_CLUSTER shipmentservice "$version"
build_and_deploy_service shipment-service $GKE_CLUSTER shipmentservice
cd ..;;

# case 6 build and deploy admin-service
"admin-service")
build_and_deploy_service admin-service $GKE_CLUSTER adminservice "$version"
build_and_deploy_service admin-service $GKE_CLUSTER adminservice
cd ..;;

# case 7 build and deploy cart-service
"cart-service")
build_and_deploy_service cart-service $GKE_CLUSTER cartservice "$version"
build_and_deploy_service cart-service $GKE_CLUSTER cartservice
cd ..;;

# case 8 build and deploy car-ui app
"car-ui")
build_and_deploy_service car-ui $GKE_CLUSTER carui "$version"
build_and_deploy_service car-ui $GKE_CLUSTER carui
cd ..;;

# case 8 build and deploy car-ui app
"elastic-search")
build_and_deploy_service elastic-search $GKE_CLUSTER elasticsearch "$version"
build_and_deploy_service elastic-search $GKE_CLUSTER elasticsearch
cd ..;;
esac

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ public class PubSubDataHandler implements CloudEventsFunction {
* Constructor for the PubSubDataHandler class.
* Initializes the Firestore instance.
*/
public PubSubDataHandler() {try {firestore = FirestoreOptions.getDefaultInstance().getService();} catch (ApiException e) {logger.severe("Firestore initialization error: "+ e.getMessage());
public PubSubDataHandler() {
try {
firestore = FirestoreOptions
.getDefaultInstance().getService();
} catch (ApiException e) {
logger.severe("Firestore initialization error: "
+ e.getMessage());
}
}

Expand Down
33 changes: 0 additions & 33 deletions common/settings.xml

This file was deleted.

167 changes: 167 additions & 0 deletions databricks/PubSub-shipping-ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Databricks notebook source
# DBTITLE 1,Authentication Credential to read events from PubSub

client_id_secret = dbutils.secrets.get(scope = "gcp-pubsub", key = "client_id_1")
client_email_secret = dbutils.secrets.get(scope = "gcp-pubsub", key = "client_email_1")
private_key_secret = dbutils.secrets.get(scope = "gcp-pubsub", key = "private_key_1")
private_key_id_secret = dbutils.secrets.get(scope = "gcp-pubsub", key = "private_key_id_1")
authOptions = {"client_id": client_id_secret,
"client_email": client_email_secret,
"private_key": private_key_secret,
"private_key_id": private_key_id_secret}

# COMMAND ----------

# DBTITLE 1, Spark structured streaming ingestion from PubSub topic
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *

shipingInputDF = spark.readStream.format("pubsub") \
.option("subscriptionId", "shipment_subscription") \
.option("topicId", "shipping-notification") \
.option("projectId", "datamesh-2") \
.option("numFetchPartitions", "3") \
.options(**authOptions) \
.load()

# COMMAND ----------

# DBTITLE 1,Schema for the shipping events
shipingDetailsSchema = (
StructType()
.add("shipmentId", "string")
.add("orderId", "string")
.add("paymentId", "string")
.add("userId", "string")
.add("firstName", "string")
.add("lastName", "string")
.add("address", "string")
.add("emailId", "string")
.add("mobileNumber", "string")
.add("productId", "string")
.add("brand", "string")
.add("quantity", "integer")
.add("basePrice", "float")
.add("subTotal", "float")
.add("total", "float")
.add("tax", "float")
.add("totalTax", "float")
)

# COMMAND ----------

# DBTITLE 1,Spark data frame for the input shipping event
shipingDetailDF = (
shipingInputDF
.select(
from_json(
col("payload").cast("string"),
shipingDetailsSchema
)
.alias("shipingdata")
)
.select(
"shipingdata.shipmentId",
"shipingdata.orderId",
"shipingdata.paymentId",
"shipingdata.userId",
"shipingdata.firstName",
"shipingdata.lastName",
"shipingdata.address",
"shipingdata.emailId",
"shipingdata.mobileNumber",
"shipingdata.productId",
"shipingdata.brand",
"shipingdata.quantity",
"shipingdata.basePrice",
"shipingdata.subTotal",
"shipingdata.total",
"shipingdata.tax",
"shipingdata.totalTax"
)
)

# COMMAND ----------

# DBTITLE 1,Writing streaming raw shipping data frame to the delta lake table (Bronze table)
shipingDetailDF.writeStream.format("delta") \
.outputMode("append") \
.partitionBy("brand") \
.option("checkpointLocation", "/dbfs/pubsub-shippment-checkpoint-38/") \
.trigger(processingTime = '3 seconds') \
.table("main.car_demo_data_lake.shipping_bronze")

# COMMAND ----------

# DBTITLE 1,Reading streaming shippment events from bronze table
silverDF = spark.readStream.table("main.car_demo_data_lake.shipping_bronze")

# COMMAND ----------

# DBTITLE 1,Creating encryption key
from cryptography.fernet import Fernet

encryptionKey = Fernet.generate_key()

# COMMAND ----------

# DBTITLE 1,Create Spark UDFs in python for encrypting a value
def encrypt_val(clear_text,MASTER_KEY):
from cryptography.fernet import Fernet
f = Fernet(MASTER_KEY)
clear_text_b=bytes(clear_text, 'utf-8')
cipher_text = f.encrypt(clear_text_b)
cipher_text = str(cipher_text.decode('ascii'))
return cipher_text

# COMMAND ----------

# DBTITLE 1,Use the UDF in a dataframe to encrypt a productid column
from pyspark.sql.functions import udf, lit, md5
from pyspark.sql.types import StringType

encrypt = udf(encrypt_val, StringType())

encryptedDF = silverDF.withColumn("userId", encrypt("userId",lit(encryptionKey))) \
.withColumn("firstName", encrypt("firstName",lit(encryptionKey))) \
.withColumn("lastName", encrypt("lastName",lit(encryptionKey))) \
.withColumn("address", encrypt("address",lit(encryptionKey))) \
.withColumn("emailId", encrypt("emailId",lit(encryptionKey))) \
.withColumn("mobileNumber", encrypt("mobileNumber",lit(encryptionKey)))

# COMMAND ----------

# DBTITLE 1,Writing transformed silver data frame to the silver table
encryptedDF.writeStream.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/dbfs/pubsub-shippment-sliver-checkpoint-38/") \
.partitionBy("brand") \
.trigger(processingTime = '2 seconds') \
.table("main.car_demo_data_lake.shipping_sliver")

# COMMAND ----------

# DBTITLE 1,Reading streaming data frame from silver table
goldDF = spark.readStream \
.table("main.car_demo_data_lake.shipping_sliver")

# COMMAND ----------

# DBTITLE 1,Aggregate cars quantity and price for each brands and years
group_cols = ["brand"]
vechileGoldDF = goldDF.groupBy(group_cols) \
.agg(sum("quantity").alias("total_quantity_shipped"), sum("subTotal").alias("total_selling_price_inr"))

# COMMAND ----------

# DBTITLE 1,Writing aggregated result to Gold table
(
vechileGoldDF.writeStream \
.format("delta") \
.outputMode("complete") \
.partitionBy("brand") \
.option("checkpointLocation", "/dbfs/pubsub-shipping-gold-38/") \
.trigger(processingTime = '1 seconds') \
.table("main.car_demo_all_brands.shipping_data_gold")
)
64 changes: 64 additions & 0 deletions databricks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Car demo data infrastructure and analytics

For the analytical purpose, the insightful data generated by microservices brings to the databricks
Lakehouse platform.

The Microservices publish data to pubsub topics. The streaming data pipeline, which basically receive the streaming event from
cloud pubsub and writing to the several databricks Delta Lake table. There is a medallion architecture while creating the pipeline.
There is a bronze table for keeping the raw data. From the raw data we are doing some basic transformations.
Basically doing the encryption for the PII columns, then writing to the Silver Table and from the Silver Table we are doing some aggregation.
and writing aggregated result to the Golden Table and this golden table we are using for creating the visualization and reporting.

![img.png](img.png)

There is a streaming pipeline([PubSub-shipping-ingestion.py](PubSub-shipping-ingestion.py)) created with spark structured streaming to read shipment events from pubsub topic
and write it to different delta lake tables.

### Prerequisite
1. Databricks workspace created with data plane resides on GCP account running the gcp pubsub and other services.
follow the ([document](https://docs.gcp.databricks.com/en/administration-guide/workspace/create-workspace.html)) to create workspace on GCP
2. Create a [Unity Catalog metastore](https://docs.gcp.databricks.com/en/data-governance/unity-catalog/create-metastore.html)
Note: Unity Catalog provides centralized access control, auditing, and data discovery capabilities across Databricks workspaces.
so Unity catalog is using for data governance.
![img_1.png](img_1.png)
3. All the microservices deployed on GKE and publishing event to pubsub topic. the streaming pipeline([PubSub-shipping-ingestion.py](PubSub-shipping-ingestion.py))
read events from topic shipping-notification and subscription. so topic and subscription should be created.
4. To connect databricks to gcp pubsub will be needed gcp credentials. The credential should be stores in
```databricks workspace. Follow this [document](https://docs.gcp.databricks.com/en/security/secrets/secret-scopes.html) to keep secrets in secrets. Following credential to be keep in secret scope.
client_id_secret = dbutils.secrets.get(scope = "gcp-pubsub", key = "client_id")
client_email_secret = dbutils.secrets.get(scope = "gcp-pubsub", key = "client_email")
private_key_secret = dbutils.secrets.get(scope = "gcp-pubsub", key = "private_key")
private_key_id_secret = dbutils.secrets.get(scope = "gcp-pubsub", key = "private_key_id")
```
pick these secrets from gcp service account json key.
5. Create a small size [compute cluster](https://docs.gcp.databricks.com/en/compute/configure.html) on data bricks.
6. For Visualization, Install power BI desktop on your machine.
To connect Power BI desktop to databricks cluster follow - [document](https://docs.gcp.databricks.com/en/partners/bi/power-bi.html)

### How to run
1. Import this ([PubSub-shipping-ingestion.py](PubSub-shipping-ingestion.py)) this file to databricks notebook
follow [document](https://docs.gcp.databricks.com/en/notebooks/notebook-export-import.html#import-a-notebook) to import
2. [Attach](https://docs.gcp.databricks.com/en/notebooks/notebook-ui.html#attach) a cluster created to the notebook.
3. [Run](https://docs.gcp.databricks.com/en/notebooks/run-notebook.html) the notebook.

### Result

The pipeline will crete three table in unity catalog -
a) main.car_demo_data_lake.shipping_bronze
b) main.car_demo_data_lake.shipping_sliver
c) main.car_demo_all_brands.shipping_data_gold

Note: these delta lake tables Creates automatically.

![img_2.png](img_2.png)

This gold table connects to powerBI desktop for the visualization
![img_3.png](img_3.png)

#### Data governance
All data assets tables ,view, databases stores in unity catalog. So, Grant and revoke of permission to a user
and service principle will be executed on databricks unity catalog. Data discovery will be done on unity catalog.
![img_4.png](img_4.png)



Binary file added databricks/img.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added databricks/img_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added databricks/img_2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added databricks/img_3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added databricks/img_4.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
GKE_CLUSTER="$1"
REGION="$2"
# Authenticate axon-server-gke
gcloud container clusters get-credentials "$GKE_CLUSTER" --region "$REGION" --project "datamesh-2"
gcloud container clusters get-credentials "$GKE_CLUSTER" --region "$REGION"

kubectl apply -f axon-server-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
GKE_CLUSTER="$1"
REGION="$2"
# Authenticate axon-server-gke
gcloud container clusters get-credentials "$GKE_CLUSTER" --region "$REGION" --project "datamesh-2"
gcloud container clusters get-credentials "$GKE_CLUSTER" --region "$REGION"

#Run this command to install CRDS
kubectl create -f https://download.elastic.co/downloads/eck/2.9.0/crds.yaml
Expand Down
Loading

0 comments on commit 6135496

Please sign in to comment.