Skip to content

Commit

Permalink
change sdk to 2.43.0 with BigtableOptions and set maxNumWorkers to 10…
Browse files Browse the repository at this point in the history
…00 (#252)

* change sdk to 2.43.0 with BigtableOptions and set maxNumWorkers to 1000

* change name to dataflowMaxNumWorkers
  • Loading branch information
n-h-diaz authored Feb 10, 2024
1 parent 749e01e commit ff78d0c
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@
"paramType": "TEXT",
"isOptional": true
},
{
"name": "dataflowMaxNumWorkers",
"label": "Dataflow maximum number of workers",
"helpText": "The maximum number of workers for Dataflow jobs.",
"paramType": "TEXT",
"isOptional": true
},
{
"name": "tempLocation",
"label": "GCP temp location",
Expand Down
6 changes: 3 additions & 3 deletions bigtable_automation/java/dataflow/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
<packaging>jar</packaging>

<properties>
<beam.version>2.53.0</beam.version>
<beam.version>2.43.0</beam.version>
<bigtable.version>1.4.0</bigtable.version>
<slf4j.version>1.7.21</slf4j.version>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,10 @@ public interface CloudBigtableOptions extends DataflowPipelineOptions {

@SuppressWarnings("unused")
void setBigtableAppProfileId(ValueProvider<String> bigtableAppProfileId);

@Description("The maximum number of workers for Dataflow jobs.")
ValueProvider<String> getDataflowMaxNumWorkers();

@SuppressWarnings("unused")
void setDataflowMaxNumWorkers(ValueProvider<String> dataflowMaxNumWorkers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.Mutation.SetCell;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -117,7 +118,10 @@ public static interface BigtableCsvOptions extends CloudBigtableOptions {
public static void main(String[] args) throws IllegalArgumentException {
BigtableCsvOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableCsvOptions.class);

if (options.getDataflowMaxNumWorkers().isAccessible() &&
StringUtils.isNotEmpty(options.getDataflowMaxNumWorkers().get())) {
options.setMaxNumWorkers(Integer.parseInt(options.getDataflowMaxNumWorkers().get()));
}
Pipeline pipeline = Pipeline.create(options);
PCollection<KV<ByteString, Iterable<Mutation>>> cacheData = pipeline
.apply("ReadMyFile", TextIO.read().from(options.getInputFile()).withHintMatchesManyFiles())
Expand All @@ -129,7 +133,10 @@ public static void main(String[] args) throws IllegalArgumentException {
.withTableId(options.getBigtableTableId());
if (options.getBigtableAppProfileId().isAccessible() &&
StringUtils.isNotEmpty(options.getBigtableAppProfileId().get())) {
write = write.withAppProfileId(options.getBigtableAppProfileId());
BigtableOptions bigtableOptions = BigtableOptions.builder()
.setAppProfileId(options.getBigtableAppProfileId().get())
.build();
write = write.withBigtableOptions(bigtableOptions);
}
// Write with results.
PCollection<BigtableWriteResult> writeResults = cacheData
Expand Down
3 changes: 2 additions & 1 deletion gcf/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func prodInternal(ctx context.Context, e lib.GCSEvent) error {
dataPath := os.Getenv("dataPath")
controlPath := os.Getenv("controlPath")
appProfileID := os.Getenv("appProfileID")
maxNumWorkers := os.Getenv("maxNumWorkers")
if projectID == "" {
return errors.New("projectID is not set in environment")
}
Expand Down Expand Up @@ -87,7 +88,7 @@ func prodInternal(ctx context.Context, e lib.GCSEvent) error {
if err := lib.SetupBT(ctx, projectID, instance, tableID); err != nil {
return err
}
err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate, appProfileID)
err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate, appProfileID, maxNumWorkers)
if err != nil {
if errDeleteBT := lib.DeleteBTTable(ctx, projectID, instance, tableID); errDeleteBT != nil {
log.Printf("Failed to delete BT table on failed GCS write: %v", errDeleteBT)
Expand Down
1 change: 1 addition & 0 deletions gcf/base/gcp/config/base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ tempLocation: "gs://datcom-store-resources/dataflow/tmp"
dataPath: "gs://datcom-store"
controlPath: "gs://datcom-control/base"
appProfileID: "batch"
maxNumWorkers: 1000
1 change: 1 addition & 0 deletions gcf/base/gcp/config/branch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ tempLocation: "gs://datcom-store-resources/dataflow/tmp"
dataPath: "gs://datcom-store"
controlPath: "gs://datcom-control/branch"
appProfileID: "batch"
maxNumWorkers: 1000
2 changes: 1 addition & 1 deletion gcf/base/local/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ if [[ "$1" == "flex" ]]; then
fi

# Good to read the yaml file keys and convert them to bash array
for var in projectID cluster instance dataflowTemplate dataPath controlPath appProfileID
for var in projectID cluster instance dataflowTemplate dataPath controlPath appProfileID maxNumWorkers
do
value=$(yq eval .$var $config_file)
export $var=$value
Expand Down
1 change: 1 addition & 0 deletions gcf/base/local/local_flex.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ dataPath: "gs://prophet_cache"
controlPath: "gs://automation_control_test/branch"
tempLocation: "gs://datcom-store-dev-resources/dataflow/tmp"
appProfileID: "batch"
maxNumWorkers: 1000
3 changes: 2 additions & 1 deletion gcf/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func customInternal(ctx context.Context, e lib.GCSEvent) error {
instance := os.Getenv("instance")
cluster := os.Getenv("cluster")
dataflowTemplate := os.Getenv("dataflowTemplate")
maxNumWorkers := os.Getenv("maxNumWorkers")
if projectID == "" {
return errors.New("projectID is not set in environment")
}
Expand Down Expand Up @@ -78,7 +79,7 @@ func customInternal(ctx context.Context, e lib.GCSEvent) error {
}
dataPath := lib.JoinURL(rootFolder, "cache")
controlPath := lib.JoinURL(rootFolder, "control")
err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate, "")
err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate, "", maxNumWorkers)
if err != nil {
if errDeleteBT := lib.DeleteBTTable(ctx, projectID, instance, tableID); errDeleteBT != nil {
log.Printf("Failed to delete BT table on failed Dataflow launch: %v", errDeleteBT)
Expand Down
37 changes: 21 additions & 16 deletions gcf/lib/dataflow_launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func LaunchDataflowJob(
controlPath string,
dataflowTemplate string,
appProfileID string,
maxNumWorkers string,
) error {
// Flex templates are json based templates.
// Please see the README under java/dataflow in this repo.
Expand All @@ -66,15 +67,15 @@ func LaunchDataflowJob(
return launchFromFlexTemplate(
ctx, projectID, instance, tableID,
dataPath, controlPath, dataflowTemplate,
appProfileID,
appProfileID, maxNumWorkers,
)
}

log.Printf("Launching Classic Template: %s", dataflowTemplate)
return launchFromClassicTemplate(
ctx, projectID, instance, tableID,
dataPath, controlPath, dataflowTemplate,
appProfileID,
appProfileID, maxNumWorkers,
)
}

Expand All @@ -94,6 +95,7 @@ func launchFromFlexTemplate(
controlPath string,
dataflowTemplate string,
appProfileID string,
maxNumWorkers string,
) error {
dataflowService, err := dataflow.NewService(ctx)
if err != nil {
Expand All @@ -116,13 +118,14 @@ func launchFromFlexTemplate(
params := &dataflow.LaunchFlexTemplateParameter{
JobName: jobName,
Parameters: map[string]string{
"inputFile": dataFile,
"completionFile": completedPath,
"bigtableInstanceId": instance,
"bigtableTableId": tableID,
"bigtableProjectId": projectID,
"tempLocation": tempLocation,
"bigtableAppProfileId": appProfileID,
"inputFile": dataFile,
"completionFile": completedPath,
"bigtableInstanceId": instance,
"bigtableTableId": tableID,
"bigtableProjectId": projectID,
"tempLocation": tempLocation,
"bigtableAppProfileId": appProfileID,
"dataflowMaxNumWorkers": maxNumWorkers,
},
ContainerSpecGcsPath: dataflowTemplate,
}
Expand Down Expand Up @@ -154,6 +157,7 @@ func launchFromClassicTemplate(
controlPath string,
dataflowTemplate string,
appProfileID string,
maxNumWorkers string,
) error {
dataflowService, err := dataflow.NewService(ctx)
if err != nil {
Expand All @@ -165,13 +169,14 @@ func launchFromClassicTemplate(
params := &dataflow.LaunchTemplateParameters{
JobName: tableID,
Parameters: map[string]string{
"inputFile": dataFile,
"completionFile": completedPath,
"bigtableInstanceId": instance,
"bigtableTableId": tableID,
"bigtableProjectId": projectID,
"region": region,
"bigtableAppProfileId": appProfileID,
"inputFile": dataFile,
"completionFile": completedPath,
"bigtableInstanceId": instance,
"bigtableTableId": tableID,
"bigtableProjectId": projectID,
"region": region,
"bigtableAppProfileId": appProfileID,
"dataflowMaxNumWorkers": maxNumWorkers,
},
}
log.Printf("[%s/%s] Launching dataflow job: %s -> %s\n", instance, tableID, dataFile, launchedPath)
Expand Down

0 comments on commit ff78d0c

Please sign in to comment.