diff --git a/bigtable_automation/java/dataflow/csv2bt-template-metadata.json b/bigtable_automation/java/dataflow/csv2bt-template-metadata.json
index bddbeb75..bab8eae4 100644
--- a/bigtable_automation/java/dataflow/csv2bt-template-metadata.json
+++ b/bigtable_automation/java/dataflow/csv2bt-template-metadata.json
@@ -32,6 +32,13 @@
"helpText": "The Cloud Bigtable table ID in the instance.",
"paramType": "TEXT"
},
+ {
+ "name": "bigtableAppProfileId",
+ "label": "Bigtable App Profile ID",
+ "helpText": "The App Profile ID to use for Bigtable writes.",
+ "paramType": "TEXT",
+ "isOptional": true
+ },
{
"name": "tempLocation",
"label": "GCP temp location",
@@ -39,4 +46,4 @@
"paramType": "TEXT"
}
]
-}
\ No newline at end of file
+}
diff --git a/bigtable_automation/java/dataflow/pom.xml b/bigtable_automation/java/dataflow/pom.xml
index 61e44cc5..eb70ac00 100644
--- a/bigtable_automation/java/dataflow/pom.xml
+++ b/bigtable_automation/java/dataflow/pom.xml
@@ -7,7 +7,7 @@
jar
- 2.43.0
+ 2.53.0
1.4.0
1.7.21
1.7
diff --git a/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CloudBigtableOptions.java b/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CloudBigtableOptions.java
index c67860c3..89e1ee8d 100644
--- a/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CloudBigtableOptions.java
+++ b/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CloudBigtableOptions.java
@@ -23,4 +23,9 @@ public interface CloudBigtableOptions extends DataflowPipelineOptions {
@SuppressWarnings("unused")
void setBigtableTableId(ValueProvider bigtableTableId);
+ @Description("The App Profile to use for Bigtable writes.")
+ ValueProvider getBigtableAppProfileId();
+
+ @SuppressWarnings("unused")
+ void setBigtableAppProfileId(ValueProvider bigtableAppProfileId);
}
diff --git a/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CsvImport.java b/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CsvImport.java
index ff0a8c5a..e18045a0 100644
--- a/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CsvImport.java
+++ b/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CsvImport.java
@@ -25,6 +25,7 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.hadoop.hbase.shaded.org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -126,6 +127,10 @@ public static void main(String[] args) throws IllegalArgumentException {
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId());
+ if (options.getBigtableAppProfileId().isAccessible() &&
+ StringUtils.isNotEmpty(options.getBigtableAppProfileId().get())) {
+ write = write.withAppProfileId(options.getBigtableAppProfileId());
+ }
// Write with results.
PCollection writeResults = cacheData
.apply("WriteToBigtable", write.withWriteResults());
diff --git a/gcf/README.md b/gcf/README.md
index a1ee6e76..6049affd 100644
--- a/gcf/README.md
+++ b/gcf/README.md
@@ -28,7 +28,7 @@ production entry point can be tested locally through `local/main.go`
0. First start the Cloud Function locally, as follows:
```bash
- ./local/deploy.sh
+ ./base/local/deploy.sh
```
1. A test branch cache exists in this [folder](https://pantheon.corp.google.com/storage/browser/prophet_cache/dcbranch_2022_05_06_16_16_13).
@@ -36,13 +36,13 @@ production entry point can be tested locally through `local/main.go`
Just in case a test was run with that cache before, clean-up first:
```bash
- ./local/test.sh cleanup
+ ./base/local/test.sh cleanup
```
2. Fake an init trigger from Google pipeline:
```bash
- ./local/test.sh init
+ ./base/local/test.sh init
```
To validate this step:
@@ -62,7 +62,7 @@ production entry point can be tested locally through `local/main.go`
3. Fake a completion trigger from Dataflow job:
```bash
- ./local/test.sh completed
+ ./base/local/test.sh completed
```
Validate this step by confirming that the
@@ -74,8 +74,8 @@ production entry point can be tested locally through `local/main.go`
After validating the change in test environment, deploy to PROD by running:
```bash
-./prod/deploy.sh base
-./prod/deploy.sh branch
+./base/gcp/deploy.sh base
+./base/gcp/deploy.sh branch
```
When this completes, look at the
@@ -86,5 +86,5 @@ To deploy private GCF, identify the environment, pick the corresponding yaml
files in `private/*.yaml` and run
```bash
-./private/deploy.sh
+./base/custom/deploy.sh
```
diff --git a/gcf/base.go b/gcf/base.go
index 7aee93e8..b2eefa9d 100644
--- a/gcf/base.go
+++ b/gcf/base.go
@@ -31,6 +31,7 @@ func prodInternal(ctx context.Context, e lib.GCSEvent) error {
dataflowTemplate := os.Getenv("dataflowTemplate")
dataPath := os.Getenv("dataPath")
controlPath := os.Getenv("controlPath")
+ appProfileID := os.Getenv("appProfileID")
if projectID == "" {
return errors.New("projectID is not set in environment")
}
@@ -86,7 +87,7 @@ func prodInternal(ctx context.Context, e lib.GCSEvent) error {
if err := lib.SetupBT(ctx, projectID, instance, tableID); err != nil {
return err
}
- err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate)
+ err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate, appProfileID)
if err != nil {
if errDeleteBT := lib.DeleteBTTable(ctx, projectID, instance, tableID); errDeleteBT != nil {
log.Printf("Failed to delete BT table on failed GCS write: %v", errDeleteBT)
diff --git a/gcf/base/gcp/config/base.yaml b/gcf/base/gcp/config/base.yaml
index a28b85d2..2878148f 100644
--- a/gcf/base/gcp/config/base.yaml
+++ b/gcf/base/gcp/config/base.yaml
@@ -2,5 +2,7 @@ projectID: "datcom-store"
instance: "prophet-cache"
cluster: "prophet-cache-c1"
dataflowTemplate: "gs://datcom-templates/templates/flex/csv_to_bt_0.0.3.json"
+tempLocation: "gs://datcom-store-resources/dataflow/tmp"
dataPath: "gs://datcom-store"
controlPath: "gs://datcom-control/base"
+appProfileID: "batch"
diff --git a/gcf/base/gcp/config/branch.yaml b/gcf/base/gcp/config/branch.yaml
index fd4dc40b..7ae5f6eb 100644
--- a/gcf/base/gcp/config/branch.yaml
+++ b/gcf/base/gcp/config/branch.yaml
@@ -1,6 +1,8 @@
projectID: "datcom-store"
instance: "prophet-branch-cache"
cluster: "prophet-branch-cache-c1"
-dataflowTemplate: "gs://datcom-templates/templates/csv_to_bt_0.0.3"
+dataflowTemplate: "gs://datcom-templates/templates/flex/csv_to_bt_0.0.3.json"
+tempLocation: "gs://datcom-store-resources/dataflow/tmp"
dataPath: "gs://datcom-store"
controlPath: "gs://datcom-control/branch"
+appProfileID: "batch"
diff --git a/gcf/base/local/deploy.sh b/gcf/base/local/deploy.sh
index f98e0b30..f180f139 100755
--- a/gcf/base/local/deploy.sh
+++ b/gcf/base/local/deploy.sh
@@ -27,10 +27,10 @@ if [[ "$1" == "flex" ]]; then
fi
# Good to read the yaml file keys and convert them to bash array
-for var in projectID cluster instance dataflowTemplate dataPath controlPath
+for var in projectID cluster instance dataflowTemplate dataPath controlPath appProfileID
do
value=$(yq eval .$var $config_file)
export $var=$value
done
-go run main.go
\ No newline at end of file
+go run main.go
diff --git a/gcf/base/local/local_flex.yaml b/gcf/base/local/local_flex.yaml
index 86c6509b..efe4e86f 100644
--- a/gcf/base/local/local_flex.yaml
+++ b/gcf/base/local/local_flex.yaml
@@ -7,3 +7,4 @@ dataflowTemplate: "gs://datcom-dataflow-templates/templates/flex/csv_to_bt.json"
dataPath: "gs://prophet_cache"
controlPath: "gs://automation_control_test/branch"
tempLocation: "gs://datcom-store-dev-resources/dataflow/tmp"
+appProfileID: "batch"
diff --git a/gcf/custom.go b/gcf/custom.go
index a4a77f83..dec0ad78 100644
--- a/gcf/custom.go
+++ b/gcf/custom.go
@@ -78,7 +78,7 @@ func customInternal(ctx context.Context, e lib.GCSEvent) error {
}
dataPath := lib.JoinURL(rootFolder, "cache")
controlPath := lib.JoinURL(rootFolder, "control")
- err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate)
+ err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate, "")
if err != nil {
if errDeleteBT := lib.DeleteBTTable(ctx, projectID, instance, tableID); errDeleteBT != nil {
log.Printf("Failed to delete BT table on failed Dataflow launch: %v", errDeleteBT)
diff --git a/gcf/lib/dataflow_launcher.go b/gcf/lib/dataflow_launcher.go
index fb757aaf..d0fcea41 100644
--- a/gcf/lib/dataflow_launcher.go
+++ b/gcf/lib/dataflow_launcher.go
@@ -57,6 +57,7 @@ func LaunchDataflowJob(
dataPath string,
controlPath string,
dataflowTemplate string,
+ appProfileID string,
) error {
// Flex templates are json based templates.
// Please see the README under java/dataflow in this repo.
@@ -65,6 +66,7 @@ func LaunchDataflowJob(
return launchFromFlexTemplate(
ctx, projectID, instance, tableID,
dataPath, controlPath, dataflowTemplate,
+ appProfileID,
)
}
@@ -72,6 +74,7 @@ func LaunchDataflowJob(
return launchFromClassicTemplate(
ctx, projectID, instance, tableID,
dataPath, controlPath, dataflowTemplate,
+ appProfileID,
)
}
@@ -90,6 +93,7 @@ func launchFromFlexTemplate(
dataPath string,
controlPath string,
dataflowTemplate string,
+ appProfileID string,
) error {
dataflowService, err := dataflow.NewService(ctx)
if err != nil {
@@ -112,12 +116,13 @@ func launchFromFlexTemplate(
params := &dataflow.LaunchFlexTemplateParameter{
JobName: jobName,
Parameters: map[string]string{
- "inputFile": dataFile,
- "completionFile": completedPath,
- "bigtableInstanceId": instance,
- "bigtableTableId": tableID,
- "bigtableProjectId": projectID,
- "tempLocation": tempLocation,
+ "inputFile": dataFile,
+ "completionFile": completedPath,
+ "bigtableInstanceId": instance,
+ "bigtableTableId": tableID,
+ "bigtableProjectId": projectID,
+ "tempLocation": tempLocation,
+ "bigtableAppProfileId": appProfileID,
},
ContainerSpecGcsPath: dataflowTemplate,
}
@@ -148,6 +153,7 @@ func launchFromClassicTemplate(
dataPath string,
controlPath string,
dataflowTemplate string,
+ appProfileID string,
) error {
dataflowService, err := dataflow.NewService(ctx)
if err != nil {
@@ -159,12 +165,13 @@ func launchFromClassicTemplate(
params := &dataflow.LaunchTemplateParameters{
JobName: tableID,
Parameters: map[string]string{
- "inputFile": dataFile,
- "completionFile": completedPath,
- "bigtableInstanceId": instance,
- "bigtableTableId": tableID,
- "bigtableProjectId": projectID,
- "region": region,
+ "inputFile": dataFile,
+ "completionFile": completedPath,
+ "bigtableInstanceId": instance,
+ "bigtableTableId": tableID,
+ "bigtableProjectId": projectID,
+ "region": region,
+ "bigtableAppProfileId": appProfileID,
},
}
log.Printf("[%s/%s] Launching dataflow job: %s -> %s\n", instance, tableID, dataFile, launchedPath)