Skip to content

Commit

Permalink
Revert "Prevent Dataflow options in parameters (#11153) (#7943)"
Browse files Browse the repository at this point in the history
This reverts commit 3b4aac2.
  • Loading branch information
shuyama1 committed Aug 19, 2024
1 parent f95d017 commit 22b8ca4
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 59 deletions.
3 changes: 0 additions & 3 deletions .changelog/11153.txt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -311,23 +311,38 @@ func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interfac
func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_tpg.Config) (dataflow.FlexTemplateRuntimeEnvironment, map[string]string, error) {

updatedParameters := tpgresource.ExpandStringMap(d, "parameters")
if err := hasIllegalParametersErr(d); err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
}

additionalExperiments := tpgresource.ConvertStringSet(d.Get("additional_experiments").(*schema.Set))

var autoscalingAlgorithm string
autoscalingAlgorithm, updatedParameters = dataflowFlexJobTypeTransferVar("autoscaling_algorithm", "autoscalingAlgorithm", updatedParameters, d)

numWorkers, err := parseInt64("num_workers", d)
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
var numWorkers int
if p, ok := d.GetOk("parameters.numWorkers"); ok {
number, err := strconv.Atoi(p.(string))
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.numWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
delete(updatedParameters, "numWorkers")
numWorkers = number
} else {
if v, ok := d.GetOk("num_workers"); ok {
numWorkers = v.(int)
}
}

maxNumWorkers, err := parseInt64("max_workers", d)
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
var maxNumWorkers int
if p, ok := d.GetOk("parameters.maxNumWorkers"); ok {
number, err := strconv.Atoi(p.(string))
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.maxNumWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
delete(updatedParameters, "maxNumWorkers")
maxNumWorkers = number
} else {
if v, ok := d.GetOk("max_workers"); ok {
maxNumWorkers = v.(int)
}
}

network, updatedParameters := dataflowFlexJobTypeTransferVar("network", "network", updatedParameters, d)
Expand All @@ -346,9 +361,22 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t

ipConfiguration, updatedParameters := dataflowFlexJobTypeTransferVar("ip_configuration", "ipConfiguration", updatedParameters, d)

enableStreamingEngine, err := parseBool("enable_streaming_engine", d)
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
var enableStreamingEngine bool
if p, ok := d.GetOk("parameters.enableStreamingEngine"); ok {
delete(updatedParameters, "enableStreamingEngine")
e := strings.ToLower(p.(string))
switch e {
case "true":
enableStreamingEngine = true
case "false":
enableStreamingEngine = false
default:
return dataflow.FlexTemplateRuntimeEnvironment{}, nil, fmt.Errorf("error when handling parameters.enableStreamingEngine value: expected value to be true or false but got value `%s`", e)
}
} else {
if v, ok := d.GetOk("enable_streaming_engine"); ok {
enableStreamingEngine = v.(bool)
}
}

sdkContainerImage, updatedParameters := dataflowFlexJobTypeTransferVar("sdk_container_image", "sdkContainerImage", updatedParameters, d)
Expand All @@ -358,8 +386,8 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t
env := dataflow.FlexTemplateRuntimeEnvironment{
AdditionalUserLabels: tpgresource.ExpandStringMap(d, "effective_labels"),
AutoscalingAlgorithm: autoscalingAlgorithm,
NumWorkers: numWorkers,
MaxWorkers: maxNumWorkers,
NumWorkers: int64(numWorkers),
MaxWorkers: int64(maxNumWorkers),
Network: network,
ServiceAccountEmail: serviceAccountEmail,
Subnetwork: subnetwork,
Expand Down Expand Up @@ -812,43 +840,3 @@ func dataflowFlexJobTypeParameterOverride(ename, pname string, d *schema.Resourc
}
return nil
}

func hasIllegalParametersErr(d *schema.ResourceData) error {
pKey := "parameters"
errFmt := "%s must not include Dataflow options, found: %s"
for k := range ResourceDataflowFlexTemplateJob().Schema {
if _, notOk := d.GetOk(fmt.Sprintf("%s.%s", pKey, k)); notOk {
return fmt.Errorf(errFmt, pKey, k)
}
kk := tpgresource.SnakeToPascalCase(k)
kk = strings.ToLower(kk)
if _, notOk := d.GetOk(fmt.Sprintf("%s.%s", pKey, kk)); notOk {
return fmt.Errorf(errFmt, pKey, kk)
}
}
return nil
}

func parseInt64(name string, d *schema.ResourceData) (int64, error) {
v, ok := d.GetOk(name)
if !ok {
return 0, nil
}
vv, err := strconv.ParseInt(fmt.Sprint(v), 10, 64)
if err != nil {
return 0, fmt.Errorf("illegal value assigned to %s, got: %s", name, v)
}
return vv, nil
}

func parseBool(name string, d *schema.ResourceData) (bool, error) {
v, ok := d.GetOk(name)
if !ok {
return false, nil
}
vv, err := strconv.ParseBool(fmt.Sprint(v))
if err != nil {
return false, fmt.Errorf("illegal value assigned to %s, got: %s", name, v)
}
return vv, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -582,12 +582,20 @@ func TestAccDataflowFlexTemplateJob_enableStreamingEngine(t *testing.T) {
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_param(job, bucket, topic),
ExpectError: regexp.MustCompile("must not include Dataflow options"),
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_param(job, bucket, topic),
Check: resource.ComposeTestCheckFunc(
// Is set
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine", "true"),
// Is not set
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine"),
),
},
{
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_field(job, bucket, topic),
Check: resource.ComposeTestCheckFunc(
// Now is unset
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine"),
// Now is set
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine", "true"),
),
},
Expand Down

0 comments on commit 22b8ca4

Please sign in to comment.