From ff78d0c087d2518f1b320b339dcd20583f217c3b Mon Sep 17 00:00:00 2001
From: natalie <77713883+n-h-diaz@users.noreply.github.com>
Date: Fri, 9 Feb 2024 17:11:07 -0800
Subject: [PATCH] change sdk to 2.43.0 with BigtableOptions and set
maxNumWorkers to 1000 (#252)
* change sdk to 2.43.0 with BigtableOptions and set maxNumWorkers to 1000
* change name to dataflowMaxNumWorkers
---
.../dataflow/csv2bt-template-metadata.json | 7 ++++
bigtable_automation/java/dataflow/pom.xml | 6 +--
.../dataflow/CloudBigtableOptions.java | 6 +++
.../org/datacommons/dataflow/CsvImport.java | 11 +++++-
gcf/base.go | 3 +-
gcf/base/gcp/config/base.yaml | 1 +
gcf/base/gcp/config/branch.yaml | 1 +
gcf/base/local/deploy.sh | 2 +-
gcf/base/local/local_flex.yaml | 1 +
gcf/custom.go | 3 +-
gcf/lib/dataflow_launcher.go | 37 +++++++++++--------
11 files changed, 54 insertions(+), 24 deletions(-)
diff --git a/bigtable_automation/java/dataflow/csv2bt-template-metadata.json b/bigtable_automation/java/dataflow/csv2bt-template-metadata.json
index bab8eae..4d8f828 100644
--- a/bigtable_automation/java/dataflow/csv2bt-template-metadata.json
+++ b/bigtable_automation/java/dataflow/csv2bt-template-metadata.json
@@ -39,6 +39,13 @@
"paramType": "TEXT",
"isOptional": true
},
+ {
+ "name": "dataflowMaxNumWorkers",
+ "label": "Dataflow maximum number of workers",
+ "helpText": "The maximum number of workers for Dataflow jobs.",
+ "paramType": "TEXT",
+ "isOptional": true
+ },
{
"name": "tempLocation",
"label": "GCP temp location",
diff --git a/bigtable_automation/java/dataflow/pom.xml b/bigtable_automation/java/dataflow/pom.xml
index eb70ac0..32ad575 100644
--- a/bigtable_automation/java/dataflow/pom.xml
+++ b/bigtable_automation/java/dataflow/pom.xml
@@ -7,11 +7,11 @@
jar
- 2.53.0
+ 2.43.0
1.4.0
1.7.21
- 1.7
- 1.7
+ 1.8
+ 1.8
3.2.4
diff --git a/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CloudBigtableOptions.java b/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CloudBigtableOptions.java
index 89e1ee8..e3b3b9e 100644
--- a/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CloudBigtableOptions.java
+++ b/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CloudBigtableOptions.java
@@ -28,4 +28,10 @@ public interface CloudBigtableOptions extends DataflowPipelineOptions {
@SuppressWarnings("unused")
void setBigtableAppProfileId(ValueProvider bigtableAppProfileId);
+
+ @Description("The maximum number of workers for Dataflow jobs.")
+ ValueProvider getDataflowMaxNumWorkers();
+
+ @SuppressWarnings("unused")
+ void setDataflowMaxNumWorkers(ValueProvider dataflowMaxNumWorkers);
}
diff --git a/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CsvImport.java b/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CsvImport.java
index e18045a..16ed4cd 100644
--- a/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CsvImport.java
+++ b/bigtable_automation/java/dataflow/src/main/java/org/datacommons/dataflow/CsvImport.java
@@ -2,6 +2,7 @@
import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.Mutation.SetCell;
+import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
@@ -117,7 +118,10 @@ public static interface BigtableCsvOptions extends CloudBigtableOptions {
public static void main(String[] args) throws IllegalArgumentException {
BigtableCsvOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableCsvOptions.class);
-
+ if (options.getDataflowMaxNumWorkers().isAccessible() &&
+ StringUtils.isNotEmpty(options.getDataflowMaxNumWorkers().get())) {
+ options.setMaxNumWorkers(Integer.parseInt(options.getDataflowMaxNumWorkers().get()));
+ }
Pipeline pipeline = Pipeline.create(options);
PCollection>> cacheData = pipeline
.apply("ReadMyFile", TextIO.read().from(options.getInputFile()).withHintMatchesManyFiles())
@@ -129,7 +133,10 @@ public static void main(String[] args) throws IllegalArgumentException {
.withTableId(options.getBigtableTableId());
if (options.getBigtableAppProfileId().isAccessible() &&
StringUtils.isNotEmpty(options.getBigtableAppProfileId().get())) {
- write = write.withAppProfileId(options.getBigtableAppProfileId());
+ BigtableOptions bigtableOptions = BigtableOptions.builder()
+ .setAppProfileId(options.getBigtableAppProfileId().get())
+ .build();
+ write = write.withBigtableOptions(bigtableOptions);
}
// Write with results.
PCollection writeResults = cacheData
diff --git a/gcf/base.go b/gcf/base.go
index b2eefa9..8b7df62 100644
--- a/gcf/base.go
+++ b/gcf/base.go
@@ -32,6 +32,7 @@ func prodInternal(ctx context.Context, e lib.GCSEvent) error {
dataPath := os.Getenv("dataPath")
controlPath := os.Getenv("controlPath")
appProfileID := os.Getenv("appProfileID")
+ maxNumWorkers := os.Getenv("maxNumWorkers")
if projectID == "" {
return errors.New("projectID is not set in environment")
}
@@ -87,7 +88,7 @@ func prodInternal(ctx context.Context, e lib.GCSEvent) error {
if err := lib.SetupBT(ctx, projectID, instance, tableID); err != nil {
return err
}
- err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate, appProfileID)
+ err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate, appProfileID, maxNumWorkers)
if err != nil {
if errDeleteBT := lib.DeleteBTTable(ctx, projectID, instance, tableID); errDeleteBT != nil {
log.Printf("Failed to delete BT table on failed GCS write: %v", errDeleteBT)
diff --git a/gcf/base/gcp/config/base.yaml b/gcf/base/gcp/config/base.yaml
index 2878148..f5766fe 100644
--- a/gcf/base/gcp/config/base.yaml
+++ b/gcf/base/gcp/config/base.yaml
@@ -6,3 +6,4 @@ tempLocation: "gs://datcom-store-resources/dataflow/tmp"
dataPath: "gs://datcom-store"
controlPath: "gs://datcom-control/base"
appProfileID: "batch"
+maxNumWorkers: 1000
diff --git a/gcf/base/gcp/config/branch.yaml b/gcf/base/gcp/config/branch.yaml
index 7ae5f6e..8de26a5 100644
--- a/gcf/base/gcp/config/branch.yaml
+++ b/gcf/base/gcp/config/branch.yaml
@@ -6,3 +6,4 @@ tempLocation: "gs://datcom-store-resources/dataflow/tmp"
dataPath: "gs://datcom-store"
controlPath: "gs://datcom-control/branch"
appProfileID: "batch"
+maxNumWorkers: 1000
diff --git a/gcf/base/local/deploy.sh b/gcf/base/local/deploy.sh
index f180f13..a56a8d8 100755
--- a/gcf/base/local/deploy.sh
+++ b/gcf/base/local/deploy.sh
@@ -27,7 +27,7 @@ if [[ "$1" == "flex" ]]; then
fi
# Good to read the yaml file keys and convert them to bash array
-for var in projectID cluster instance dataflowTemplate dataPath controlPath appProfileID
+for var in projectID cluster instance dataflowTemplate dataPath controlPath appProfileID maxNumWorkers
do
value=$(yq eval .$var $config_file)
export $var=$value
diff --git a/gcf/base/local/local_flex.yaml b/gcf/base/local/local_flex.yaml
index efe4e86..cd81610 100644
--- a/gcf/base/local/local_flex.yaml
+++ b/gcf/base/local/local_flex.yaml
@@ -8,3 +8,4 @@ dataPath: "gs://prophet_cache"
controlPath: "gs://automation_control_test/branch"
tempLocation: "gs://datcom-store-dev-resources/dataflow/tmp"
appProfileID: "batch"
+maxNumWorkers: 1000
diff --git a/gcf/custom.go b/gcf/custom.go
index dec0ad7..d1c4c97 100644
--- a/gcf/custom.go
+++ b/gcf/custom.go
@@ -36,6 +36,7 @@ func customInternal(ctx context.Context, e lib.GCSEvent) error {
instance := os.Getenv("instance")
cluster := os.Getenv("cluster")
dataflowTemplate := os.Getenv("dataflowTemplate")
+ maxNumWorkers := os.Getenv("maxNumWorkers")
if projectID == "" {
return errors.New("projectID is not set in environment")
}
@@ -78,7 +79,7 @@ func customInternal(ctx context.Context, e lib.GCSEvent) error {
}
dataPath := lib.JoinURL(rootFolder, "cache")
controlPath := lib.JoinURL(rootFolder, "control")
- err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate, "")
+ err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate, "", maxNumWorkers)
if err != nil {
if errDeleteBT := lib.DeleteBTTable(ctx, projectID, instance, tableID); errDeleteBT != nil {
log.Printf("Failed to delete BT table on failed Dataflow launch: %v", errDeleteBT)
diff --git a/gcf/lib/dataflow_launcher.go b/gcf/lib/dataflow_launcher.go
index d0fcea4..41387d4 100644
--- a/gcf/lib/dataflow_launcher.go
+++ b/gcf/lib/dataflow_launcher.go
@@ -58,6 +58,7 @@ func LaunchDataflowJob(
controlPath string,
dataflowTemplate string,
appProfileID string,
+ maxNumWorkers string,
) error {
// Flex templates are json based templates.
// Please see the README under java/dataflow in this repo.
@@ -66,7 +67,7 @@ func LaunchDataflowJob(
return launchFromFlexTemplate(
ctx, projectID, instance, tableID,
dataPath, controlPath, dataflowTemplate,
- appProfileID,
+ appProfileID, maxNumWorkers,
)
}
@@ -74,7 +75,7 @@ func LaunchDataflowJob(
return launchFromClassicTemplate(
ctx, projectID, instance, tableID,
dataPath, controlPath, dataflowTemplate,
- appProfileID,
+ appProfileID, maxNumWorkers,
)
}
@@ -94,6 +95,7 @@ func launchFromFlexTemplate(
controlPath string,
dataflowTemplate string,
appProfileID string,
+ maxNumWorkers string,
) error {
dataflowService, err := dataflow.NewService(ctx)
if err != nil {
@@ -116,13 +118,14 @@ func launchFromFlexTemplate(
params := &dataflow.LaunchFlexTemplateParameter{
JobName: jobName,
Parameters: map[string]string{
- "inputFile": dataFile,
- "completionFile": completedPath,
- "bigtableInstanceId": instance,
- "bigtableTableId": tableID,
- "bigtableProjectId": projectID,
- "tempLocation": tempLocation,
- "bigtableAppProfileId": appProfileID,
+ "inputFile": dataFile,
+ "completionFile": completedPath,
+ "bigtableInstanceId": instance,
+ "bigtableTableId": tableID,
+ "bigtableProjectId": projectID,
+ "tempLocation": tempLocation,
+ "bigtableAppProfileId": appProfileID,
+ "dataflowMaxNumWorkers": maxNumWorkers,
},
ContainerSpecGcsPath: dataflowTemplate,
}
@@ -154,6 +157,7 @@ func launchFromClassicTemplate(
controlPath string,
dataflowTemplate string,
appProfileID string,
+ maxNumWorkers string,
) error {
dataflowService, err := dataflow.NewService(ctx)
if err != nil {
@@ -165,13 +169,14 @@ func launchFromClassicTemplate(
params := &dataflow.LaunchTemplateParameters{
JobName: tableID,
Parameters: map[string]string{
- "inputFile": dataFile,
- "completionFile": completedPath,
- "bigtableInstanceId": instance,
- "bigtableTableId": tableID,
- "bigtableProjectId": projectID,
- "region": region,
- "bigtableAppProfileId": appProfileID,
+ "inputFile": dataFile,
+ "completionFile": completedPath,
+ "bigtableInstanceId": instance,
+ "bigtableTableId": tableID,
+ "bigtableProjectId": projectID,
+ "region": region,
+ "bigtableAppProfileId": appProfileID,
+ "dataflowMaxNumWorkers": maxNumWorkers,
},
}
log.Printf("[%s/%s] Launching dataflow job: %s -> %s\n", instance, tableID, dataFile, launchedPath)