Skip to content

Commit

Permalink
Prevent Dataflow options in parameters (GoogleCloudPlatform#11153)
Browse files Browse the repository at this point in the history
  • Loading branch information
damondouglas authored Aug 9, 2024
1 parent 3c26638 commit 2897e86
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -310,38 +310,23 @@ func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interfac
func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_tpg.Config) (dataflow.FlexTemplateRuntimeEnvironment, map[string]string, error) {

updatedParameters := tpgresource.ExpandStringMap(d, "parameters")
if err := hasIllegalParametersErr(d); err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
}

additionalExperiments := tpgresource.ConvertStringSet(d.Get("additional_experiments").(*schema.Set))

var autoscalingAlgorithm string
autoscalingAlgorithm, updatedParameters = dataflowFlexJobTypeTransferVar("autoscaling_algorithm", "autoscalingAlgorithm", updatedParameters, d)

var numWorkers int
if p, ok := d.GetOk("parameters.numWorkers"); ok {
number, err := strconv.Atoi(p.(string))
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.numWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
delete(updatedParameters, "numWorkers")
numWorkers = number
} else {
if v, ok := d.GetOk("num_workers"); ok {
numWorkers = v.(int)
}
numWorkers, err := parseInt64("num_workers", d)
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
}

var maxNumWorkers int
if p, ok := d.GetOk("parameters.maxNumWorkers"); ok {
number, err := strconv.Atoi(p.(string))
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.maxNumWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
delete(updatedParameters, "maxNumWorkers")
maxNumWorkers = number
} else {
if v, ok := d.GetOk("max_workers"); ok {
maxNumWorkers = v.(int)
}
maxNumWorkers, err := parseInt64("max_workers", d)
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
}

network, updatedParameters := dataflowFlexJobTypeTransferVar("network", "network", updatedParameters, d)
Expand All @@ -360,23 +345,10 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t

ipConfiguration, updatedParameters := dataflowFlexJobTypeTransferVar("ip_configuration", "ipConfiguration", updatedParameters, d)

var enableStreamingEngine bool
if p, ok := d.GetOk("parameters.enableStreamingEngine"); ok {
delete(updatedParameters, "enableStreamingEngine")
e := strings.ToLower(p.(string))
switch e {
case "true":
enableStreamingEngine = true
case "false":
enableStreamingEngine = false
default:
return dataflow.FlexTemplateRuntimeEnvironment{}, nil, fmt.Errorf("error when handling parameters.enableStreamingEngine value: expected value to be true or false but got value `%s`", e)
}
} else {
if v, ok := d.GetOk("enable_streaming_engine"); ok {
enableStreamingEngine = v.(bool)
}
}
enableStreamingEngine, err := parseBool("enable_streaming_engine", d)
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
}

sdkContainerImage, updatedParameters := dataflowFlexJobTypeTransferVar("sdk_container_image", "sdkContainerImage", updatedParameters, d)

Expand All @@ -385,8 +357,8 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t
env := dataflow.FlexTemplateRuntimeEnvironment{
AdditionalUserLabels: tpgresource.ExpandStringMap(d, "effective_labels"),
AutoscalingAlgorithm: autoscalingAlgorithm,
NumWorkers: int64(numWorkers),
MaxWorkers: int64(maxNumWorkers),
NumWorkers: numWorkers,
MaxWorkers: maxNumWorkers,
Network: network,
ServiceAccountEmail: serviceAccountEmail,
Subnetwork: subnetwork,
Expand Down Expand Up @@ -840,4 +812,44 @@ func dataflowFlexJobTypeParameterOverride(ename, pname string, d *schema.Resourc
return nil
}

func hasIllegalParametersErr(d *schema.ResourceData) error {
pKey := "parameters"
errFmt := "%s must not include Dataflow options, found: %s"
for k := range ResourceDataflowFlexTemplateJob().Schema {
if _, notOk := d.GetOk(fmt.Sprintf("%s.%s", pKey, k)); notOk {
return fmt.Errorf(errFmt, pKey, k)
}
kk := tpgresource.SnakeToPascalCase(k)
kk = strings.ToLower(kk)
if _, notOk := d.GetOk(fmt.Sprintf("%s.%s", pKey, kk)); notOk {
return fmt.Errorf(errFmt, pKey, kk)
}
}
return nil
}

func parseInt64(name string, d *schema.ResourceData) (int64, error) {
v, ok := d.GetOk(name)
if !ok {
return 0, nil
}
vv, err := strconv.ParseInt(fmt.Sprint(v), 10, 64)
if err != nil {
return 0, fmt.Errorf("illegal value assigned to %s, got: %s", name, v)
}
return vv, nil
}

func parseBool(name string, d *schema.ResourceData) (bool, error) {
v, ok := d.GetOk(name)
if !ok {
return false, nil
}
vv, err := strconv.ParseBool(fmt.Sprint(v))
if err != nil {
return false, fmt.Errorf("illegal value assigned to %s, got: %s", name, v)
}
return vv, nil
}

{{ end }}
Original file line number Diff line number Diff line change
Expand Up @@ -581,20 +581,12 @@ func TestAccDataflowFlexTemplateJob_enableStreamingEngine(t *testing.T) {
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_param(job, bucket, topic),
Check: resource.ComposeTestCheckFunc(
// Is set
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine", "true"),
// Is not set
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine"),
),
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_param(job, bucket, topic),
ExpectError: regexp.MustCompile("must not include Dataflow options"),
},
{
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_field(job, bucket, topic),
Check: resource.ComposeTestCheckFunc(
// Now is unset
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine"),
// Now is set
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine", "true"),
),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,39 +311,24 @@ func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interfac
func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_tpg.Config) (dataflow.FlexTemplateRuntimeEnvironment, map[string]string, error) {

updatedParameters := tpgresource.ExpandStringMap(d, "parameters")
if err := hasIllegalParametersErr(d); err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
}

additionalExperiments := tpgresource.ConvertStringSet(d.Get("additional_experiments").(*schema.Set))

var autoscalingAlgorithm string
autoscalingAlgorithm, updatedParameters = dataflowFlexJobTypeTransferVar("autoscaling_algorithm", "autoscalingAlgorithm", updatedParameters, d)

var numWorkers int
if p, ok := d.GetOk("parameters.numWorkers"); ok {
number, err := strconv.Atoi(p.(string))
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.numWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
delete(updatedParameters, "numWorkers")
numWorkers = number
} else {
if v, ok := d.GetOk("num_workers"); ok {
numWorkers = v.(int)
}
}
numWorkers, err := parseInt64("num_workers", d)
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
}

var maxNumWorkers int
if p, ok := d.GetOk("parameters.maxNumWorkers"); ok {
number, err := strconv.Atoi(p.(string))
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.maxNumWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
delete(updatedParameters, "maxNumWorkers")
maxNumWorkers = number
} else {
if v, ok := d.GetOk("max_workers"); ok {
maxNumWorkers = v.(int)
}
}
maxNumWorkers, err := parseInt64("max_workers", d)
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
}

network, updatedParameters := dataflowFlexJobTypeTransferVar("network", "network", updatedParameters, d)

Expand All @@ -361,23 +346,10 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t

ipConfiguration, updatedParameters := dataflowFlexJobTypeTransferVar("ip_configuration", "ipConfiguration", updatedParameters, d)

var enableStreamingEngine bool
if p, ok := d.GetOk("parameters.enableStreamingEngine"); ok {
delete(updatedParameters, "enableStreamingEngine")
e := strings.ToLower(p.(string))
switch e {
case "true":
enableStreamingEngine = true
case "false":
enableStreamingEngine = false
default:
return dataflow.FlexTemplateRuntimeEnvironment{}, nil, fmt.Errorf("error when handling parameters.enableStreamingEngine value: expected value to be true or false but got value `%s`", e)
}
} else {
if v, ok := d.GetOk("enable_streaming_engine"); ok {
enableStreamingEngine = v.(bool)
}
}
enableStreamingEngine, err := parseBool("enable_streaming_engine", d)
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
}

sdkContainerImage, updatedParameters := dataflowFlexJobTypeTransferVar("sdk_container_image", "sdkContainerImage", updatedParameters, d)

Expand All @@ -386,8 +358,8 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t
env := dataflow.FlexTemplateRuntimeEnvironment{
AdditionalUserLabels: tpgresource.ExpandStringMap(d, "effective_labels"),
AutoscalingAlgorithm: autoscalingAlgorithm,
NumWorkers: int64(numWorkers),
MaxWorkers: int64(maxNumWorkers),
NumWorkers: numWorkers,
MaxWorkers: maxNumWorkers,
Network: network,
ServiceAccountEmail: serviceAccountEmail,
Subnetwork: subnetwork,
Expand Down Expand Up @@ -841,4 +813,44 @@ func dataflowFlexJobTypeParameterOverride(ename, pname string, d *schema.Resourc
return nil
}

func hasIllegalParametersErr(d *schema.ResourceData) error {
pKey := "parameters"
errFmt := "%s must not include Dataflow options, found: %s"
for k := range ResourceDataflowFlexTemplateJob().Schema {
if _, notOk := d.GetOk(fmt.Sprintf("%s.%s", pKey, k)); notOk {
return fmt.Errorf(errFmt, pKey, k)
}
kk :=tpgresource.SnakeToPascalCase(k)
kk = strings.ToLower(kk)
if _, notOk := d.GetOk(fmt.Sprintf("%s.%s", pKey, kk)); notOk {
return fmt.Errorf(errFmt, pKey, kk)
}
}
return nil
}

func parseInt64(name string, d *schema.ResourceData) (int64, error) {
v, ok := d.GetOk(name)
if !ok {
return 0, nil
}
vv, err := strconv.ParseInt(fmt.Sprint(v), 10, 64)
if err != nil {
return 0, fmt.Errorf("illegal value assigned to %s, got: %s", name, v)
}
return vv, nil
}

func parseBool(name string, d *schema.ResourceData) (bool, error) {
v, ok := d.GetOk(name)
if !ok {
return false, nil
}
vv, err := strconv.ParseBool(fmt.Sprint(v))
if err != nil {
return false, fmt.Errorf("illegal value assigned to %s, got: %s", name, v)
}
return vv, nil
}

<% end -%>
Original file line number Diff line number Diff line change
Expand Up @@ -583,19 +583,11 @@ func TestAccDataflowFlexTemplateJob_enableStreamingEngine(t *testing.T) {
Steps: []resource.TestStep{
{
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_param(job, bucket, topic),
Check: resource.ComposeTestCheckFunc(
// Is set
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine", "true"),
// Is not set
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine"),
),
ExpectError: regexp.MustCompile("must not include Dataflow options"),
},
{
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_field(job, bucket, topic),
Check: resource.ComposeTestCheckFunc(
// Now is unset
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine"),
// Now is set
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine", "true"),
),
},
Expand Down

0 comments on commit 2897e86

Please sign in to comment.