diff --git a/.github/workflows/car-demo-pipeline.yml b/.github/workflows/car-demo-pipeline.yml index de08b9a1..ecfb9028 100644 --- a/.github/workflows/car-demo-pipeline.yml +++ b/.github/workflows/car-demo-pipeline.yml @@ -12,6 +12,7 @@ jobs: permissions: contents: read packages: write + id-token: write steps: - uses: actions/checkout@v3 - name: Set up JDK 19 diff --git a/README.md b/README.md index 7367ffef..208f4970 100644 --- a/README.md +++ b/README.md @@ -197,6 +197,11 @@ To verify the deployment of Kubernetes on Google Cloud Platform (GCP), follow th Examine logs for applications and services to identify any issues or error messages. ``` +#### Data platform +For the analytical purpose, the insightful data generated by microservices brings to the databricks +Lakehouse platform. +find the details here [README.md](databricks%2FREADME.md) + Documentation -- - Demo session [available](documentation/automobile ecommerce-platform.pptx) diff --git a/apps-deployment-script.sh b/apps-deployment-script.sh index c490f399..e8191392 100644 --- a/apps-deployment-script.sh +++ b/apps-deployment-script.sh @@ -11,7 +11,6 @@ build_and_deploy_service(){ SERVICE_NAME=$1 CLUSTER_NAME=$2 DEPLOYMENT_NAME=$3 - VERSION=$4 echo "---------build and deploy $SERVICE_NAME-----------" cd "$SERVICE_NAME" || exit if [ $SERVICE_NAME != "car-ui" ]; then @@ -29,8 +28,7 @@ build_and_deploy_service(){ chmod u+x ./kustomize # set docker image for kustomize - # shellcheck disable=SC2140 - ./kustomize edit set image gcr.io/PROJECT_ID/IMAGE:TAG=gcr.io/"$PROJECT_ID"/"$SERVICE_NAME":"${GITHUB_SHA}_${VERSION}" + ./kustomize edit set image gcr.io/PROJECT_ID/IMAGE:TAG=gcr.io/"$PROJECT_ID"/"$SERVICE_NAME":"$GITHUB_SHA" # deploy through kubectl ./kustomize build . | kubectl apply -f - kubectl rollout status deployment/"$DEPLOYMENT_NAME" @@ -39,12 +37,9 @@ build_and_deploy_service(){ } -for STR in $(cat projects-changes-deploy.txt) - +for project in $(cat projects-changes-deploy.txt) do : - project=$(echo "$STR" | cut -f1 -d-) - version=$(echo "$STR" | cut -f2 -d-) case $project in # case 1 build and deploy package common "common") @@ -61,37 +56,37 @@ do # case 3 build and deploy inventory-service "inventory-service") - build_and_deploy_service inventory-service $GKE_CLUSTER inventoryservice "$version" + build_and_deploy_service inventory-service $GKE_CLUSTER inventoryservice cd ..;; # case 4 build and deploy payment-service "payment-service") - build_and_deploy_service payment-service $GKE_CLUSTER paymentservice "$version" + build_and_deploy_service payment-service $GKE_CLUSTER paymentservice cd ..;; # case 5 build and deploy shipment-service "shipment-service") - build_and_deploy_service shipment-service $GKE_CLUSTER shipmentservice "$version" + build_and_deploy_service shipment-service $GKE_CLUSTER shipmentservice cd ..;; # case 6 build and deploy admin-service "admin-service") - build_and_deploy_service admin-service $GKE_CLUSTER adminservice "$version" + build_and_deploy_service admin-service $GKE_CLUSTER adminservice cd ..;; # case 7 build and deploy cart-service "cart-service") - build_and_deploy_service cart-service $GKE_CLUSTER cartservice "$version" + build_and_deploy_service cart-service $GKE_CLUSTER cartservice cd ..;; # case 8 build and deploy car-ui app "car-ui") - build_and_deploy_service car-ui $GKE_CLUSTER carui "$version" + build_and_deploy_service car-ui $GKE_CLUSTER carui cd ..;; # case 8 build and deploy car-ui app "elastic-search") - build_and_deploy_service elastic-search $GKE_CLUSTER elasticsearch "$version" + build_and_deploy_service elastic-search $GKE_CLUSTER elasticsearch cd ..;; esac diff --git a/cloud-function/gcpcarfunction/src/main/java/com/knoldus/cloudfunction/PubSubDataHandler.java b/cloud-function/gcpcarfunction/src/main/java/com/knoldus/cloudfunction/PubSubDataHandler.java index aacaf929..674b59f5 100644 --- a/cloud-function/gcpcarfunction/src/main/java/com/knoldus/cloudfunction/PubSubDataHandler.java +++ b/cloud-function/gcpcarfunction/src/main/java/com/knoldus/cloudfunction/PubSubDataHandler.java @@ -48,7 +48,13 @@ public class PubSubDataHandler implements CloudEventsFunction { * Constructor for the PubSubDataHandler class. * Initializes the Firestore instance. */ - public PubSubDataHandler() {try {firestore = FirestoreOptions.getDefaultInstance().getService();} catch (ApiException e) {logger.severe("Firestore initialization error: "+ e.getMessage()); + public PubSubDataHandler() { + try { + firestore = FirestoreOptions + .getDefaultInstance().getService(); + } catch (ApiException e) { + logger.severe("Firestore initialization error: " + + e.getMessage()); } } diff --git a/common/settings.xml b/common/settings.xml deleted file mode 100644 index 615e5732..00000000 --- a/common/settings.xml +++ /dev/null @@ -1,33 +0,0 @@ - - - - - - - github - - - central - https://repo1.maven.org/maven2 - - - github - https://maven.pkg.github.com/NashTech-Labs/car-demo - - true - - - - - - - - - github - kundan59 - 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 - - - diff --git a/databricks/PubSub-shipping-ingestion.py b/databricks/PubSub-shipping-ingestion.py new file mode 100644 index 00000000..421821b5 --- /dev/null +++ b/databricks/PubSub-shipping-ingestion.py @@ -0,0 +1,167 @@ +# Databricks notebook source +# DBTITLE 1,Authentication Credential to read events from PubSub + +client_id_secret = dbutils.secrets.get(scope = "gcp-pubsub", key = "client_id_1") +client_email_secret = dbutils.secrets.get(scope = "gcp-pubsub", key = "client_email_1") +private_key_secret = dbutils.secrets.get(scope = "gcp-pubsub", key = "private_key_1") +private_key_id_secret = dbutils.secrets.get(scope = "gcp-pubsub", key = "private_key_id_1") +authOptions = {"client_id": client_id_secret, + "client_email": client_email_secret, + "private_key": private_key_secret, + "private_key_id": private_key_id_secret} + +# COMMAND ---------- + +# DBTITLE 1, Spark structured streaming ingestion from PubSub topic +from pyspark.sql.functions import * +from pyspark.sql.types import * +from pyspark.sql import * + +shipingInputDF = spark.readStream.format("pubsub") \ +.option("subscriptionId", "shipment_subscription") \ +.option("topicId", "shipping-notification") \ +.option("projectId", "datamesh-2") \ +.option("numFetchPartitions", "3") \ +.options(**authOptions) \ +.load() + +# COMMAND ---------- + +# DBTITLE 1,Schema for the shipping events +shipingDetailsSchema = ( + StructType() + .add("shipmentId", "string") + .add("orderId", "string") + .add("paymentId", "string") + .add("userId", "string") + .add("firstName", "string") + .add("lastName", "string") + .add("address", "string") + .add("emailId", "string") + .add("mobileNumber", "string") + .add("productId", "string") + .add("brand", "string") + .add("quantity", "integer") + .add("basePrice", "float") + .add("subTotal", "float") + .add("total", "float") + .add("tax", "float") + .add("totalTax", "float") + ) + +# COMMAND ---------- + +# DBTITLE 1,Spark data frame for the input shipping event +shipingDetailDF = ( + shipingInputDF + .select( + from_json( + col("payload").cast("string"), + shipingDetailsSchema + ) + .alias("shipingdata") + ) + .select( + "shipingdata.shipmentId", + "shipingdata.orderId", + "shipingdata.paymentId", + "shipingdata.userId", + "shipingdata.firstName", + "shipingdata.lastName", + "shipingdata.address", + "shipingdata.emailId", + "shipingdata.mobileNumber", + "shipingdata.productId", + "shipingdata.brand", + "shipingdata.quantity", + "shipingdata.basePrice", + "shipingdata.subTotal", + "shipingdata.total", + "shipingdata.tax", + "shipingdata.totalTax" + ) + ) + +# COMMAND ---------- + +# DBTITLE 1,Writing streaming raw shipping data frame to the delta lake table (Bronze table) +shipingDetailDF.writeStream.format("delta") \ +.outputMode("append") \ +.partitionBy("brand") \ +.option("checkpointLocation", "/dbfs/pubsub-shippment-checkpoint-38/") \ +.trigger(processingTime = '3 seconds') \ +.table("main.car_demo_data_lake.shipping_bronze") + +# COMMAND ---------- + +# DBTITLE 1,Reading streaming shippment events from bronze table +silverDF = spark.readStream.table("main.car_demo_data_lake.shipping_bronze") + +# COMMAND ---------- + +# DBTITLE 1,Creating encryption key +from cryptography.fernet import Fernet + +encryptionKey = Fernet.generate_key() + +# COMMAND ---------- + +# DBTITLE 1,Create Spark UDFs in python for encrypting a value +def encrypt_val(clear_text,MASTER_KEY): + from cryptography.fernet import Fernet + f = Fernet(MASTER_KEY) + clear_text_b=bytes(clear_text, 'utf-8') + cipher_text = f.encrypt(clear_text_b) + cipher_text = str(cipher_text.decode('ascii')) + return cipher_text + +# COMMAND ---------- + +# DBTITLE 1,Use the UDF in a dataframe to encrypt a productid column +from pyspark.sql.functions import udf, lit, md5 +from pyspark.sql.types import StringType + +encrypt = udf(encrypt_val, StringType()) + +encryptedDF = silverDF.withColumn("userId", encrypt("userId",lit(encryptionKey))) \ + .withColumn("firstName", encrypt("firstName",lit(encryptionKey))) \ + .withColumn("lastName", encrypt("lastName",lit(encryptionKey))) \ + .withColumn("address", encrypt("address",lit(encryptionKey))) \ + .withColumn("emailId", encrypt("emailId",lit(encryptionKey))) \ + .withColumn("mobileNumber", encrypt("mobileNumber",lit(encryptionKey))) + +# COMMAND ---------- + +# DBTITLE 1,Writing transformed silver data frame to the silver table +encryptedDF.writeStream.format("delta") \ +.outputMode("append") \ +.option("checkpointLocation", "/dbfs/pubsub-shippment-sliver-checkpoint-38/") \ +.partitionBy("brand") \ +.trigger(processingTime = '2 seconds') \ +.table("main.car_demo_data_lake.shipping_sliver") + +# COMMAND ---------- + +# DBTITLE 1,Reading streaming data frame from silver table +goldDF = spark.readStream \ +.table("main.car_demo_data_lake.shipping_sliver") + +# COMMAND ---------- + +# DBTITLE 1,Aggregate cars quantity and price for each brands and years +group_cols = ["brand"] +vechileGoldDF = goldDF.groupBy(group_cols) \ +.agg(sum("quantity").alias("total_quantity_shipped"), sum("subTotal").alias("total_selling_price_inr")) + +# COMMAND ---------- + +# DBTITLE 1,Writing aggregated result to Gold table +( + vechileGoldDF.writeStream \ + .format("delta") \ + .outputMode("complete") \ + .partitionBy("brand") \ + .option("checkpointLocation", "/dbfs/pubsub-shipping-gold-38/") \ + .trigger(processingTime = '1 seconds') \ + .table("main.car_demo_all_brands.shipping_data_gold") +) diff --git a/databricks/README.md b/databricks/README.md new file mode 100644 index 00000000..8e0ff80a --- /dev/null +++ b/databricks/README.md @@ -0,0 +1,64 @@ +# Car demo data infrastructure and analytics + +For the analytical purpose, the insightful data generated by microservices brings to the databricks +Lakehouse platform. + +The Microservices publish data to pubsub topics. The streaming data pipeline, which basically receive the streaming event from +cloud pubsub and writing to the several databricks Delta Lake table. There is a medallion architecture while creating the pipeline. +There is a bronze table for keeping the raw data. From the raw data we are doing some basic transformations. +Basically doing the encryption for the PII columns, then writing to the Silver Table and from the Silver Table we are doing some aggregation. +and writing aggregated result to the Golden Table and this golden table we are using for creating the visualization and reporting. + +![img.png](img.png) + +There is a streaming pipeline([PubSub-shipping-ingestion.py](PubSub-shipping-ingestion.py)) created with spark structured streaming to read shipment events from pubsub topic +and write it to different delta lake tables. + +### Prerequisite +1. Databricks workspace created with data plane resides on GCP account running the gcp pubsub and other services. + follow the ([document](https://docs.gcp.databricks.com/en/administration-guide/workspace/create-workspace.html)) to create workspace on GCP +2. Create a [Unity Catalog metastore](https://docs.gcp.databricks.com/en/data-governance/unity-catalog/create-metastore.html) + Note: Unity Catalog provides centralized access control, auditing, and data discovery capabilities across Databricks workspaces. + so Unity catalog is using for data governance. + ![img_1.png](img_1.png) +3. All the microservices deployed on GKE and publishing event to pubsub topic. the streaming pipeline([PubSub-shipping-ingestion.py](PubSub-shipping-ingestion.py)) + read events from topic shipping-notification and subscription. so topic and subscription should be created. +4. To connect databricks to gcp pubsub will be needed gcp credentials. The credential should be stores in + ```databricks workspace. Follow this [document](https://docs.gcp.databricks.com/en/security/secrets/secret-scopes.html) to keep secrets in secrets. Following credential to be keep in secret scope. + client_id_secret = dbutils.secrets.get(scope = "gcp-pubsub", key = "client_id") + client_email_secret = dbutils.secrets.get(scope = "gcp-pubsub", key = "client_email") + private_key_secret = dbutils.secrets.get(scope = "gcp-pubsub", key = "private_key") + private_key_id_secret = dbutils.secrets.get(scope = "gcp-pubsub", key = "private_key_id") + ``` + pick these secrets from gcp service account json key. +5. Create a small size [compute cluster](https://docs.gcp.databricks.com/en/compute/configure.html) on data bricks. +6. For Visualization, Install power BI desktop on your machine. + To connect Power BI desktop to databricks cluster follow - [document](https://docs.gcp.databricks.com/en/partners/bi/power-bi.html) + +### How to run +1. Import this ([PubSub-shipping-ingestion.py](PubSub-shipping-ingestion.py)) this file to databricks notebook + follow [document](https://docs.gcp.databricks.com/en/notebooks/notebook-export-import.html#import-a-notebook) to import +2. [Attach](https://docs.gcp.databricks.com/en/notebooks/notebook-ui.html#attach) a cluster created to the notebook. +3. [Run](https://docs.gcp.databricks.com/en/notebooks/run-notebook.html) the notebook. + +### Result + +The pipeline will crete three table in unity catalog - +a) main.car_demo_data_lake.shipping_bronze +b) main.car_demo_data_lake.shipping_sliver +c) main.car_demo_all_brands.shipping_data_gold + +Note: these delta lake tables Creates automatically. + +![img_2.png](img_2.png) + +This gold table connects to powerBI desktop for the visualization +![img_3.png](img_3.png) + +#### Data governance +All data assets tables ,view, databases stores in unity catalog. So, Grant and revoke of permission to a user +and service principle will be executed on databricks unity catalog. Data discovery will be done on unity catalog. +![img_4.png](img_4.png) + + + diff --git a/databricks/img.png b/databricks/img.png new file mode 100644 index 00000000..3a54977c Binary files /dev/null and b/databricks/img.png differ diff --git a/databricks/img_1.png b/databricks/img_1.png new file mode 100644 index 00000000..3957606d Binary files /dev/null and b/databricks/img_1.png differ diff --git a/databricks/img_2.png b/databricks/img_2.png new file mode 100644 index 00000000..39cabbb4 Binary files /dev/null and b/databricks/img_2.png differ diff --git a/databricks/img_3.png b/databricks/img_3.png new file mode 100644 index 00000000..eb6e92ce Binary files /dev/null and b/databricks/img_3.png differ diff --git a/databricks/img_4.png b/databricks/img_4.png new file mode 100644 index 00000000..a7e29aa7 Binary files /dev/null and b/databricks/img_4.png differ diff --git a/deployment/gcpresources/terraform/axon-server-deployment.sh b/deployment/gcpresources/terraform/axon-server-deployment.sh index dc2efce0..0ebf02dc 100644 --- a/deployment/gcpresources/terraform/axon-server-deployment.sh +++ b/deployment/gcpresources/terraform/axon-server-deployment.sh @@ -3,6 +3,6 @@ GKE_CLUSTER="$1" REGION="$2" # Authenticate axon-server-gke -gcloud container clusters get-credentials "$GKE_CLUSTER" --region "$REGION" --project "datamesh-2" +gcloud container clusters get-credentials "$GKE_CLUSTER" --region "$REGION" kubectl apply -f axon-server-deployment.yaml \ No newline at end of file diff --git a/deployment/gcpresources/terraform/gcp-elasticsearch-deployment.sh b/deployment/gcpresources/terraform/gcp-elasticsearch-deployment.sh index f75267b2..7fbcb1b0 100644 --- a/deployment/gcpresources/terraform/gcp-elasticsearch-deployment.sh +++ b/deployment/gcpresources/terraform/gcp-elasticsearch-deployment.sh @@ -3,7 +3,7 @@ GKE_CLUSTER="$1" REGION="$2" # Authenticate axon-server-gke -gcloud container clusters get-credentials "$GKE_CLUSTER" --region "$REGION" --project "datamesh-2" +gcloud container clusters get-credentials "$GKE_CLUSTER" --region "$REGION" #Run this command to install CRDS kubectl create -f https://download.elastic.co/downloads/eck/2.9.0/crds.yaml diff --git a/deployment/gcpresources/terraform/key.json b/deployment/gcpresources/terraform/key.json index 1d0414d3..8b137891 100644 --- a/deployment/gcpresources/terraform/key.json +++ b/deployment/gcpresources/terraform/key.json @@ -1,13 +1 @@ -{ - "type": "service_account", - "project_id": "datamesh-2", - "private_key_id": "585b6eed36a592043e8f444bebdcf14d914e78f4", - "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCp1K2e9DkBRSaD\nfZKYba1ZHfVNQ3drUlf1E+aeoc4r8tb4ip8ZcZRnhK0Rip4rpPcW1a1Gq2uZyddi\nzjF59ylJTCFAmuLz2AN3aLSLmBc5npZdewp6LdOcSl2SN8z5OHe2DUmDn0BIVbVD\nGHfQy12p3V+bRWHP8+TVpo/YsqL3YLMcvbG+Nn5o2juUlb3MssDF3YPC/GIirBWz\n0smXVnYtnNQg7ZIlLA8u9RVw3p3P/QQ7BF3Ssc1HzoGXEBQRV/O8VDNySJ0RRe/r\nzp7/hXbA4u3mXdnuXBH/kSNPnqzto9GfL5VzwEdFpRNdPfXraeaM/Gkp8KUQmWP9\ns2s7rhXzAgMBAAECggEAPe64qBRg63Unv+DdwmeQxUR3DceSN5mOSOoKeoQANye2\nPGn07ibs2zlXkeOOndTpZLLknzZpJWqleLs/8L3xGPFB2RkCtbDrRrLw0E1U/ua1\n5/40Sm0G0+KMsANYZPQyN7otummrXBWhZgf4vbREAmuf73REYL0NENrOb1RpuLM6\n4TvzhblhbJG6eaGmvJkRCC0I5Oo7GzJLX3NmgcsMLhuPeyXw7/1h5gU8K6vYWLF4\nhQPMrsCKkBpPxhkiL7PWoPd5MTqVly+d/4wNoedt63fqziStEq4eY9mzyx3nKQia\nBwPx5x7fP7uRqQ/Mb6aNoh6A2lDegseNdy6HwmNbeQKBgQDhm7wZ2k7SnFfqzJiJ\nUYPP0n5U5zDFtzt9cpoygyA5RC81Lii7MG9U4JfvewhiQ2+mAm629R8nkjvRcZf8\nGLtN4jmZDkPHsG4QXWnphyc/TSf2F+e1y1ETzxj61dLJ59Ehbl43g0mVSuD6dEGG\njG+ZSSaJxOcC20evsoaGKG3Z6QKBgQDAtWoFxcOHYVaxQYrFFuNdXABQ0esBmfuB\noSEL80X6kt8F10iylJZRQ73w3qFSp0NrNXrsqUbcI2kIRAI3NStSX7JpETy+hbnl\nJVhO8KGHmiTDRA2COnpKTOWRxEAHY5gd801HWRdByTSr0pP4kJEclEsnQ/wfdjxT\nqZpTCQJrewKBgGSqEmIOsID01Z2ksKMExiWirE/B4Fc4DMlKq1bCf0xOCipWcEIl\nuGdM1z45zWHa6SRPjYsBEGedricj4kbP2jNEL6MFZGTjahjPNYTHk1GBYcVCoVbS\nY1jNXT3+zlDcqMmb71ohsGF2ya+2083vN5RTuWhbfSAfK3tP0wplW66RAoGAUf0d\nRaV1YiT0yGeVTfNF+tOIwtn8/WpxCh7uu3HeZRSV1Jtar7fuGFcU+eUOh+boFTXk\nvcnrfv+F0WF1+90gOGTEuFUAHiMxCyAPaU6RuurHF1jd0nlkzDXZOmJfx4UFFEEO\nsPtjnLapNS1bjVaclqc1LliLbStV0VaIdSmRIP8CgYBHZWE8Eh8C3F2P7s5F3SzR\nHszPsr3UfNAFIg0EKfW+TG6NFGZLKLU5CT7Od2a9G2KrHGaAsjCZ9jaZXKP0HsvO\nyCJ4DXB4NlPxfZQYAHA4WhVaXcTfQvRT64myCLxvqtRhtcBhPh3o/4OpSrY0Ehge\nMXn0dawXjLrAJsBsUgersQ==\n-----END PRIVATE KEY-----\n", - "client_email": "java-car-demo@datamesh-2.iam.gserviceaccount.com", - "client_id": "116961686900137160069", - "auth_uri": "https://accounts.google.com/o/oauth2/auth", - "token_uri": "https://oauth2.googleapis.com/token", - "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", - "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/java-car-demo%40datamesh-2.iam.gserviceaccount.com", - "universe_domain": "googleapis.com" -} + diff --git a/projects-changes-deploy.txt b/projects-changes-deploy.txt index f4cb766a..1434ad97 100644 --- a/projects-changes-deploy.txt +++ b/projects-changes-deploy.txt @@ -1,2 +1 @@ car-ui -admin-service 1.1.0