Skip to content

Commit

Permalink
Plumb in Bigtable appProfileID "batch" to Base (#249)
Browse files Browse the repository at this point in the history
* Plumb in Bigtable appProfileID "batch" to Base

* Make param optional

* Update branch.yaml to use flex.

* Add missing tempLocation
  • Loading branch information
pradh authored Jan 10, 2024
1 parent 85f5137 commit 5f0901e
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,18 @@
"helpText": "The Cloud Bigtable table ID in the instance.",
"paramType": "TEXT"
},
{
"name": "bigtableAppProfileId",
"label": "Bigtable App Profile ID",
"helpText": "The App Profile ID to use for Bigtable writes.",
"paramType": "TEXT",
"isOptional": true
},
{
"name": "tempLocation",
"label": "GCP temp location",
"helpText": "A GCS bucket where the running jobs will have access to.",
"paramType": "TEXT"
}
]
}
}
2 changes: 1 addition & 1 deletion bigtable_automation/java/dataflow/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<packaging>jar</packaging>

<properties>
<beam.version>2.43.0</beam.version>
<beam.version>2.53.0</beam.version>
<bigtable.version>1.4.0</bigtable.version>
<slf4j.version>1.7.21</slf4j.version>
<maven.compiler.source>1.7</maven.compiler.source>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,9 @@ public interface CloudBigtableOptions extends DataflowPipelineOptions {
@SuppressWarnings("unused")
void setBigtableTableId(ValueProvider<String> bigtableTableId);

@Description("The App Profile to use for Bigtable writes.")
ValueProvider<String> getBigtableAppProfileId();

@SuppressWarnings("unused")
void setBigtableAppProfileId(ValueProvider<String> bigtableAppProfileId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.hadoop.hbase.shaded.org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -126,6 +127,10 @@ public static void main(String[] args) throws IllegalArgumentException {
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId());
if (options.getBigtableAppProfileId().isAccessible() &&
StringUtils.isNotEmpty(options.getBigtableAppProfileId().get())) {
write = write.withAppProfileId(options.getBigtableAppProfileId());
}
// Write with results.
PCollection<BigtableWriteResult> writeResults = cacheData
.apply("WriteToBigtable", write.withWriteResults());
Expand Down
14 changes: 7 additions & 7 deletions gcf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@ production entry point can be tested locally through `local/main.go`
0. First start the Cloud Function locally, as follows:

```bash
./local/deploy.sh
./base/local/deploy.sh
```

1. A test branch cache exists in this [folder](https://pantheon.corp.google.com/storage/browser/prophet_cache/dcbranch_2022_05_06_16_16_13).

Just in case a test was run with that cache before, clean-up first:

```bash
./local/test.sh cleanup
./base/local/test.sh cleanup
```

2. Fake an init trigger from Google pipeline:

```bash
./local/test.sh init
./base/local/test.sh init
```

To validate this step:
Expand All @@ -62,7 +62,7 @@ production entry point can be tested locally through `local/main.go`
3. Fake a completion trigger from Dataflow job:

```bash
./local/test.sh completed
./base/local/test.sh completed
```

Validate this step by confirming that the
Expand All @@ -74,8 +74,8 @@ production entry point can be tested locally through `local/main.go`
After validating the change in test environment, deploy to PROD by running:

```bash
./prod/deploy.sh base
./prod/deploy.sh branch
./base/gcp/deploy.sh base
./base/gcp/deploy.sh branch
```

When this completes, look at the
Expand All @@ -86,5 +86,5 @@ To deploy private GCF, identify the environment, pick the corresponding yaml
files in `private/*.yaml` and run

```bash
./private/deploy.sh <env>
./base/custom/deploy.sh <env>
```
3 changes: 2 additions & 1 deletion gcf/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func prodInternal(ctx context.Context, e lib.GCSEvent) error {
dataflowTemplate := os.Getenv("dataflowTemplate")
dataPath := os.Getenv("dataPath")
controlPath := os.Getenv("controlPath")
appProfileID := os.Getenv("appProfileID")
if projectID == "" {
return errors.New("projectID is not set in environment")
}
Expand Down Expand Up @@ -86,7 +87,7 @@ func prodInternal(ctx context.Context, e lib.GCSEvent) error {
if err := lib.SetupBT(ctx, projectID, instance, tableID); err != nil {
return err
}
err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate)
err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate, appProfileID)
if err != nil {
if errDeleteBT := lib.DeleteBTTable(ctx, projectID, instance, tableID); errDeleteBT != nil {
log.Printf("Failed to delete BT table on failed GCS write: %v", errDeleteBT)
Expand Down
2 changes: 2 additions & 0 deletions gcf/base/gcp/config/base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@ projectID: "datcom-store"
instance: "prophet-cache"
cluster: "prophet-cache-c1"
dataflowTemplate: "gs://datcom-templates/templates/flex/csv_to_bt_0.0.3.json"
tempLocation: "gs://datcom-store-resources/dataflow/tmp"
dataPath: "gs://datcom-store"
controlPath: "gs://datcom-control/base"
appProfileID: "batch"
4 changes: 3 additions & 1 deletion gcf/base/gcp/config/branch.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
projectID: "datcom-store"
instance: "prophet-branch-cache"
cluster: "prophet-branch-cache-c1"
dataflowTemplate: "gs://datcom-templates/templates/csv_to_bt_0.0.3"
dataflowTemplate: "gs://datcom-templates/templates/flex/csv_to_bt_0.0.3.json"
tempLocation: "gs://datcom-store-resources/dataflow/tmp"
dataPath: "gs://datcom-store"
controlPath: "gs://datcom-control/branch"
appProfileID: "batch"
4 changes: 2 additions & 2 deletions gcf/base/local/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ if [[ "$1" == "flex" ]]; then
fi

# Good to read the yaml file keys and convert them to bash array
for var in projectID cluster instance dataflowTemplate dataPath controlPath
for var in projectID cluster instance dataflowTemplate dataPath controlPath appProfileID
do
value=$(yq eval .$var $config_file)
export $var=$value
done

go run main.go
go run main.go
1 change: 1 addition & 0 deletions gcf/base/local/local_flex.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ dataflowTemplate: "gs://datcom-dataflow-templates/templates/flex/csv_to_bt.json"
dataPath: "gs://prophet_cache"
controlPath: "gs://automation_control_test/branch"
tempLocation: "gs://datcom-store-dev-resources/dataflow/tmp"
appProfileID: "batch"
2 changes: 1 addition & 1 deletion gcf/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func customInternal(ctx context.Context, e lib.GCSEvent) error {
}
dataPath := lib.JoinURL(rootFolder, "cache")
controlPath := lib.JoinURL(rootFolder, "control")
err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate)
err = lib.LaunchDataflowJob(ctx, projectID, instance, tableID, dataPath, controlPath, dataflowTemplate, "")
if err != nil {
if errDeleteBT := lib.DeleteBTTable(ctx, projectID, instance, tableID); errDeleteBT != nil {
log.Printf("Failed to delete BT table on failed Dataflow launch: %v", errDeleteBT)
Expand Down
31 changes: 19 additions & 12 deletions gcf/lib/dataflow_launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func LaunchDataflowJob(
dataPath string,
controlPath string,
dataflowTemplate string,
appProfileID string,
) error {
// Flex templates are json based templates.
// Please see the README under java/dataflow in this repo.
Expand All @@ -65,13 +66,15 @@ func LaunchDataflowJob(
return launchFromFlexTemplate(
ctx, projectID, instance, tableID,
dataPath, controlPath, dataflowTemplate,
appProfileID,
)
}

log.Printf("Launching Classic Template: %s", dataflowTemplate)
return launchFromClassicTemplate(
ctx, projectID, instance, tableID,
dataPath, controlPath, dataflowTemplate,
appProfileID,
)
}

Expand All @@ -90,6 +93,7 @@ func launchFromFlexTemplate(
dataPath string,
controlPath string,
dataflowTemplate string,
appProfileID string,
) error {
dataflowService, err := dataflow.NewService(ctx)
if err != nil {
Expand All @@ -112,12 +116,13 @@ func launchFromFlexTemplate(
params := &dataflow.LaunchFlexTemplateParameter{
JobName: jobName,
Parameters: map[string]string{
"inputFile": dataFile,
"completionFile": completedPath,
"bigtableInstanceId": instance,
"bigtableTableId": tableID,
"bigtableProjectId": projectID,
"tempLocation": tempLocation,
"inputFile": dataFile,
"completionFile": completedPath,
"bigtableInstanceId": instance,
"bigtableTableId": tableID,
"bigtableProjectId": projectID,
"tempLocation": tempLocation,
"bigtableAppProfileId": appProfileID,
},
ContainerSpecGcsPath: dataflowTemplate,
}
Expand Down Expand Up @@ -148,6 +153,7 @@ func launchFromClassicTemplate(
dataPath string,
controlPath string,
dataflowTemplate string,
appProfileID string,
) error {
dataflowService, err := dataflow.NewService(ctx)
if err != nil {
Expand All @@ -159,12 +165,13 @@ func launchFromClassicTemplate(
params := &dataflow.LaunchTemplateParameters{
JobName: tableID,
Parameters: map[string]string{
"inputFile": dataFile,
"completionFile": completedPath,
"bigtableInstanceId": instance,
"bigtableTableId": tableID,
"bigtableProjectId": projectID,
"region": region,
"inputFile": dataFile,
"completionFile": completedPath,
"bigtableInstanceId": instance,
"bigtableTableId": tableID,
"bigtableProjectId": projectID,
"region": region,
"bigtableAppProfileId": appProfileID,
},
}
log.Printf("[%s/%s] Launching dataflow job: %s -> %s\n", instance, tableID, dataFile, launchedPath)
Expand Down

0 comments on commit 5f0901e

Please sign in to comment.