From 5f0901e0f3ebc4f8552145bd95de52fc521192df Mon Sep 17 00:00:00 2001 From: Prashanth R Date: Wed, 10 Jan 2024 10:20:52 -0800 Subject: [PATCH] Plumb in Bigtable appProfileID "batch" to Base (#249) * Plumb in Bigtable appProfileID "batch" to Base * Make param optional * Update branch.yaml to use flex. * Add missing tempLocation --- .../dataflow/csv2bt-template-metadata.json | 9 +++++- bigtable_automation/java/dataflow/pom.xml | 2 +- .../dataflow/CloudBigtableOptions.java | 5 +++ .../org/datacommons/dataflow/CsvImport.java | 5 +++ gcf/README.md | 14 ++++----- gcf/base.go | 3 +- gcf/base/gcp/config/base.yaml | 2 ++ gcf/base/gcp/config/branch.yaml | 4 ++- gcf/base/local/deploy.sh | 4 +-- gcf/base/local/local_flex.yaml | 1 + gcf/custom.go | 2 +- gcf/lib/dataflow_launcher.go | 31 ++++++++++++------- 12 files changed, 56 insertions(+), 26 deletions(-) diff --git a/bigtable_automation/java/dataflow/csv2bt-template-metadata.json b/bigtable_automation/java/dataflow/csv2bt-template-metadata.json index bddbeb75..bab8eae4 100644 --- a/bigtable_automation/java/dataflow/csv2bt-template-metadata.json +++ b/bigtable_automation/java/dataflow/csv2bt-template-metadata.json @@ -32,6 +32,13 @@ "helpText": "The Cloud Bigtable table ID in the instance.", "paramType": "TEXT" }, + { + "name": "bigtableAppProfileId", + "label": "Bigtable App Profile ID", + "helpText": "The App Profile ID to use for Bigtable writes.", + "paramType": "TEXT", + "isOptional": true + }, { "name": "tempLocation", "label": "GCP temp location", @@ -39,4 +46,4 @@ "paramType": "TEXT" } ] -} \ No newline at end of file +} diff --git a/bigtable_automation/java/dataflow/pom.xml b/bigtable_automation/java/dataflow/pom.xml index 61e44cc5..eb70ac00 100644 --- a/bigtable_automation/java/dataflow/pom.xml +++ b/bigtable_automation/java/dataflow/pom.xml @@ -7,7 +7,7 @@ jar - 2.43.0 + 2.53.0 1.4.0 1.7.21 1.7 diff --git a/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CloudBigtableOptions.java b/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CloudBigtableOptions.java index c67860c3..89e1ee8d 100644 --- a/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CloudBigtableOptions.java +++ b/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CloudBigtableOptions.java @@ -23,4 +23,9 @@ public interface CloudBigtableOptions extends DataflowPipelineOptions { @SuppressWarnings("unused") void setBigtableTableId(ValueProvider bigtableTableId); + @Description("The App Profile to use for Bigtable writes.") + ValueProvider getBigtableAppProfileId(); + + @SuppressWarnings("unused") + void setBigtableAppProfileId(ValueProvider bigtableAppProfileId); } diff --git a/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CsvImport.java b/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CsvImport.java index ff0a8c5a..e18045a0 100644 --- a/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CsvImport.java +++ b/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CsvImport.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.hadoop.hbase.shaded.org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,6 +127,10 @@ public static void main(String[] args) throws IllegalArgumentException { .withProjectId(options.getBigtableProjectId()) .withInstanceId(options.getBigtableInstanceId()) .withTableId(options.getBigtableTableId()); + if (options.getBigtableAppProfileId().isAccessible() && + StringUtils.isNotEmpty(options.getBigtableAppProfileId().get())) { + write = write.withAppProfileId(options.getBigtableAppProfileId()); + } // Write with results. PCollection writeResults = cacheData .apply("WriteToBigtable", write.withWriteResults()); diff --git a/gcf/README.md b/gcf/README.md index a1ee6e76..6049affd 100644 --- a/gcf/README.md +++ b/gcf/README.md @@ -28,7 +28,7 @@ production entry point can be tested locally through `local/main.go` 0. First start the Cloud Function locally, as follows: ```bash - ./local/deploy.sh + ./base/local/deploy.sh ``` 1. A test branch cache exists in this [folder](https://pantheon.corp.google.com/storage/browser/prophet_cache/dcbranch_2022_05_06_16_16_13). @@ -36,13 +36,13 @@ production entry point can be tested locally through `local/main.go` Just in case a test was run with that cache before, clean-up first: ```bash - ./local/test.sh cleanup + ./base/local/test.sh cleanup ``` 2. Fake an init trigger from Google pipeline: ```bash - ./local/test.sh init + ./base/local/test.sh init ``` To validate this step: @@ -62,7 +62,7 @@ production entry point can be tested locally through `local/main.go` 3. Fake a completion trigger from Dataflow job: ```bash - ./local/test.sh completed + ./base/local/test.sh completed ``` Validate this step by confirming that the @@ -74,8 +74,8 @@ production entry point can be tested locally through `local/main.go` After validating the change in test environment, deploy to PROD by running: ```bash -./prod/deploy.sh base -./prod/deploy.sh branch +./base/gcp/deploy.sh base +./base/gcp/deploy.sh branch ``` When this completes, look at the @@ -86,5 +86,5 @@ To deploy private GCF, identify the environment, pick the corresponding yaml files in `private/*.yaml` and run ```bash -./private/deploy.sh +./base/custom/deploy.sh ``` diff --git a/gcf/base.go b/gcf/base.go index 7aee93e8..b2eefa9d 100644 --- a/gcf/base.go +++ b/gcf/base.go @@ -31,6 +31,7 @@ func prodInternal(ctx context.Context, e lib.GCSEvent) error { dataflowTemplate := os.Getenv("dataflowTemplate") dataPath := os.Getenv("dataPath") controlPath := os.Getenv("controlPath") + appProfileID := os.Getenv("appProfileID") if projectID == "" { return errors.New("projectID is not set in environment") } @@ -86,7 +87,7 @@ func prodInternal(ctx context.Context, e lib.GCSEvent) error { if err := lib.SetupBT(ctx, projectID, instance, tableID); err != nil { return err } - err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate) + err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate, appProfileID) if err != nil { if errDeleteBT := lib.DeleteBTTable(ctx, projectID, instance, tableID); errDeleteBT != nil { log.Printf("Failed to delete BT table on failed GCS write: %v", errDeleteBT) diff --git a/gcf/base/gcp/config/base.yaml b/gcf/base/gcp/config/base.yaml index a28b85d2..2878148f 100644 --- a/gcf/base/gcp/config/base.yaml +++ b/gcf/base/gcp/config/base.yaml @@ -2,5 +2,7 @@ projectID: "datcom-store" instance: "prophet-cache" cluster: "prophet-cache-c1" dataflowTemplate: "gs://datcom-templates/templates/flex/csv_to_bt_0.0.3.json" +tempLocation: "gs://datcom-store-resources/dataflow/tmp" dataPath: "gs://datcom-store" controlPath: "gs://datcom-control/base" +appProfileID: "batch" diff --git a/gcf/base/gcp/config/branch.yaml b/gcf/base/gcp/config/branch.yaml index fd4dc40b..7ae5f6eb 100644 --- a/gcf/base/gcp/config/branch.yaml +++ b/gcf/base/gcp/config/branch.yaml @@ -1,6 +1,8 @@ projectID: "datcom-store" instance: "prophet-branch-cache" cluster: "prophet-branch-cache-c1" -dataflowTemplate: "gs://datcom-templates/templates/csv_to_bt_0.0.3" +dataflowTemplate: "gs://datcom-templates/templates/flex/csv_to_bt_0.0.3.json" +tempLocation: "gs://datcom-store-resources/dataflow/tmp" dataPath: "gs://datcom-store" controlPath: "gs://datcom-control/branch" +appProfileID: "batch" diff --git a/gcf/base/local/deploy.sh b/gcf/base/local/deploy.sh index f98e0b30..f180f139 100755 --- a/gcf/base/local/deploy.sh +++ b/gcf/base/local/deploy.sh @@ -27,10 +27,10 @@ if [[ "$1" == "flex" ]]; then fi # Good to read the yaml file keys and convert them to bash array -for var in projectID cluster instance dataflowTemplate dataPath controlPath +for var in projectID cluster instance dataflowTemplate dataPath controlPath appProfileID do value=$(yq eval .$var $config_file) export $var=$value done -go run main.go \ No newline at end of file +go run main.go diff --git a/gcf/base/local/local_flex.yaml b/gcf/base/local/local_flex.yaml index 86c6509b..efe4e86f 100644 --- a/gcf/base/local/local_flex.yaml +++ b/gcf/base/local/local_flex.yaml @@ -7,3 +7,4 @@ dataflowTemplate: "gs://datcom-dataflow-templates/templates/flex/csv_to_bt.json" dataPath: "gs://prophet_cache" controlPath: "gs://automation_control_test/branch" tempLocation: "gs://datcom-store-dev-resources/dataflow/tmp" +appProfileID: "batch" diff --git a/gcf/custom.go b/gcf/custom.go index a4a77f83..dec0ad78 100644 --- a/gcf/custom.go +++ b/gcf/custom.go @@ -78,7 +78,7 @@ func customInternal(ctx context.Context, e lib.GCSEvent) error { } dataPath := lib.JoinURL(rootFolder, "cache") controlPath := lib.JoinURL(rootFolder, "control") - err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate) + err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate, "") if err != nil { if errDeleteBT := lib.DeleteBTTable(ctx, projectID, instance, tableID); errDeleteBT != nil { log.Printf("Failed to delete BT table on failed Dataflow launch: %v", errDeleteBT) diff --git a/gcf/lib/dataflow_launcher.go b/gcf/lib/dataflow_launcher.go index fb757aaf..d0fcea41 100644 --- a/gcf/lib/dataflow_launcher.go +++ b/gcf/lib/dataflow_launcher.go @@ -57,6 +57,7 @@ func LaunchDataflowJob( dataPath string, controlPath string, dataflowTemplate string, + appProfileID string, ) error { // Flex templates are json based templates. // Please see the README under java/dataflow in this repo. @@ -65,6 +66,7 @@ func LaunchDataflowJob( return launchFromFlexTemplate( ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate, + appProfileID, ) } @@ -72,6 +74,7 @@ func LaunchDataflowJob( return launchFromClassicTemplate( ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate, + appProfileID, ) } @@ -90,6 +93,7 @@ func launchFromFlexTemplate( dataPath string, controlPath string, dataflowTemplate string, + appProfileID string, ) error { dataflowService, err := dataflow.NewService(ctx) if err != nil { @@ -112,12 +116,13 @@ func launchFromFlexTemplate( params := &dataflow.LaunchFlexTemplateParameter{ JobName: jobName, Parameters: map[string]string{ - "inputFile": dataFile, - "completionFile": completedPath, - "bigtableInstanceId": instance, - "bigtableTableId": tableID, - "bigtableProjectId": projectID, - "tempLocation": tempLocation, + "inputFile": dataFile, + "completionFile": completedPath, + "bigtableInstanceId": instance, + "bigtableTableId": tableID, + "bigtableProjectId": projectID, + "tempLocation": tempLocation, + "bigtableAppProfileId": appProfileID, }, ContainerSpecGcsPath: dataflowTemplate, } @@ -148,6 +153,7 @@ func launchFromClassicTemplate( dataPath string, controlPath string, dataflowTemplate string, + appProfileID string, ) error { dataflowService, err := dataflow.NewService(ctx) if err != nil { @@ -159,12 +165,13 @@ func launchFromClassicTemplate( params := &dataflow.LaunchTemplateParameters{ JobName: tableID, Parameters: map[string]string{ - "inputFile": dataFile, - "completionFile": completedPath, - "bigtableInstanceId": instance, - "bigtableTableId": tableID, - "bigtableProjectId": projectID, - "region": region, + "inputFile": dataFile, + "completionFile": completedPath, + "bigtableInstanceId": instance, + "bigtableTableId": tableID, + "bigtableProjectId": projectID, + "region": region, + "bigtableAppProfileId": appProfileID, }, } log.Printf("[%s/%s] Launching dataflow job: %s -> %s\n", instance, tableID, dataFile, launchedPath)