From 62fd086be09c7391cef12c2ebe28abd4668a6a5b Mon Sep 17 00:00:00 2001 From: Giacomo Debidda Date: Sun, 11 Sep 2022 19:42:56 +0200 Subject: [PATCH] feat: add example of data serverless pipeline with GCP workflows and Eventarc --- .../bigquery-schemas/weather_data_table.json | 49 +++++++++++++++ docs/service-accounts.md | 9 +++ iam-policies/project.yaml | 6 +- workflows/README.md | 29 +++++++++ workflows/serverless-data-pipeline.yaml | 59 +++++++++++++++++++ 5 files changed, 149 insertions(+), 3 deletions(-) create mode 100644 assets/bigquery-schemas/weather_data_table.json create mode 100644 workflows/serverless-data-pipeline.yaml diff --git a/assets/bigquery-schemas/weather_data_table.json b/assets/bigquery-schemas/weather_data_table.json new file mode 100644 index 00000000..55b1d7a3 --- /dev/null +++ b/assets/bigquery-schemas/weather_data_table.json @@ -0,0 +1,49 @@ +[ + { + "name": "sensorId", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "timecollected", + "description": "UTC date string of when the sensor reading was collected", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "zipcode", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "latitude", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "longitude", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "temperature", + "description": "temperature in Farhenheit", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "humidity", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "dewpoint", + "type": "FLOAT", + "mode": "NULLABLE" + }, + { + "name": "pressure", + "type": "FLOAT", + "mode": "NULLABLE" + } +] diff --git a/docs/service-accounts.md b/docs/service-accounts.md index 89a2a715..131512e0 100644 --- a/docs/service-accounts.md +++ b/docs/service-accounts.md @@ -33,6 +33,15 @@ gcloud iam service-accounts create sa-dash-earthquakes \ --display-name "dash-earthquakes SA" ``` +### sa-dataflow-worker + +Create a service account to run [Dataflow](https://cloud.google.com/dataflow/docs) jobs: + +```sh +gcloud iam service-accounts create sa-dataflow-worker \ + --display-name "SA Dataflow worker" +``` + ### sa-firestore-user-test Service account that I use in [firestore-utils](../packages/firestore-utils/README.md) tests. diff --git a/iam-policies/project.yaml b/iam-policies/project.yaml index 6db64d7e..15e2d54d 100644 --- a/iam-policies/project.yaml +++ b/iam-policies/project.yaml @@ -26,7 +26,7 @@ bindings: - members: - serviceAccount:sa-telegram-bot@prj-kitchen-sink.iam.gserviceaccount.com - serviceAccount:sa-wasm-news@prj-kitchen-sink.iam.gserviceaccount.com - - serviceAccount:sa-webhooks@prj-kitchen-sink.iam.gserviceaccount.com + - serviceAccount:sa-webhooks@prj-kitchen-sink.iam.gserviceaccount.com role: roles/clouddebugger.agent - members: - serviceAccount:1051247446620@cloudbuild.gserviceaccount.com @@ -146,9 +146,9 @@ bindings: - serviceAccount:1051247446620@cloudbuild.gserviceaccount.com - serviceAccount:sa-matsuri-demo-app@prj-kitchen-sink.iam.gserviceaccount.com - serviceAccount:sa-notifier@prj-kitchen-sink.iam.gserviceaccount.com - - serviceAccount:sa-telegram-bot@prj-kitchen-sink.iam.gserviceaccount.com + - serviceAccount:sa-telegram-bot@prj-kitchen-sink.iam.gserviceaccount.com - serviceAccount:sa-wasm-news@prj-kitchen-sink.iam.gserviceaccount.com - - serviceAccount:sa-webhooks@prj-kitchen-sink.iam.gserviceaccount.com + - serviceAccount:sa-webhooks@prj-kitchen-sink.iam.gserviceaccount.com - serviceAccount:sa-workflows-runner@prj-kitchen-sink.iam.gserviceaccount.com role: roles/secretmanager.secretAccessor - members: diff --git a/workflows/README.md b/workflows/README.md index 82acdef8..ba7881a8 100644 --- a/workflows/README.md +++ b/workflows/README.md @@ -40,6 +40,18 @@ gcloud workflows deploy lead-generation \ --labels customer=$CUSTOMER,environment=$ENVIRONMENT,resource=workflow ``` +### Serverless data pipeline + +```sh +gcloud workflows deploy serverless-data-pipeline \ + --project $GCP_PROJECT_ID \ + --location $WORKFLOW_LOCATION \ + --description "Serverless data pipeline (variation of the codelab 'Building a Serverless Data Pipeline: IoT to Analytics')" \ + --source workflows/serverless-data-pipeline.yaml \ + --service-account $SA_WORKFLOWS_RUNNER \ + --labels customer=$CUSTOMER,environment=$ENVIRONMENT,resource=workflow +``` + ### Web performance audit ```sh @@ -156,3 +168,20 @@ You can find the list of available [Compute Engine images](https://cloud.google. ```sh gcloud compute images list ``` + +### Publish message to PubSub topic + +```sh +gcloud pubsub topics publish weather-data \ + --message '{ + "sensorId": "sensor-xyz", + "zipcode": 55049, + "temperature": 98.5, + "timecollected": "2022-08-15 15:29:35", + "latitude": 43.8657, + "longitude": 10.2513, + "humidity": 1.23, + "dewpoint": 4.56, + "pressure": 7.89 + }' +``` \ No newline at end of file diff --git a/workflows/serverless-data-pipeline.yaml b/workflows/serverless-data-pipeline.yaml new file mode 100644 index 00000000..0b3f583f --- /dev/null +++ b/workflows/serverless-data-pipeline.yaml @@ -0,0 +1,59 @@ +# https://codelabs.developers.google.com/codelabs/iot-data-pipeline +main: + params: [args] + steps: + + - assign_variables: + assign: + - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} + - dataset_id: weather_data + - table_id: weather_data_table + - base64_decoded: "${base64.decode(args.data.data)}" + - json_data: "${json.decode(base64_decoded)}" + + # - log_stuff: + # call: sys.log + # args: + # data: { + # "json_data": "${json_data}", + # "dataset_id": "${dataset_id}", + # "table_id": "${table_id}" + # } + # severity: "WARNING" + + # - list_table_data: + # # https://cloud.google.com/workflows/docs/reference/googleapis/bigquery/v2/tabledata/list + # call: googleapis.bigquery.v2.tabledata.list + # args: + # projectId: ${project_id} + # datasetId: ${dataset_id} + # tableId: ${table_id} + # maxResults: 30 + # result: tabledata_list_result + + - insert_all_table_data: + # https://cloud.google.com/workflows/docs/reference/googleapis/bigquery/v2/tabledata/insertAll + call: googleapis.bigquery.v2.tabledata.insertAll + args: + projectId: ${project_id} + datasetId: ${dataset_id} + tableId: ${table_id} + # https://cloud.google.com/workflows/docs/reference/googleapis/bigquery/v2/Overview#TableDataInsertAllRequest + body: { + "rows": [ + {"json": "${json_data}"} + ] + } + # body: { + # "rows": [ + # {"json": {"sensorId": "sensor-abc", "zipcode": 90210, "temperature": 80.5}}, + # {"json": {"sensorId": "sensor-def", "zipcode": 10048, "temperature": 101.75}} + # ] + # } + result: tabledata_insertAll_result + + - respond_to_caller: + return: { + "tabledata_insertAll_result": "${tabledata_insertAll_result}" + # "tabledata_list_result": "${tabledata_list_result}" + } \ No newline at end of file