diff --git a/mmv1/third_party/terraform/services/container/node_config.go.tmpl b/mmv1/third_party/terraform/services/container/node_config.go.tmpl index af01c0d42d65..a62ec37a8937 100644 --- a/mmv1/third_party/terraform/services/container/node_config.go.tmpl +++ b/mmv1/third_party/terraform/services/container/node_config.go.tmpl @@ -2029,10 +2029,13 @@ func flattenHostMaintenancePolicy(c *container.HostMaintenancePolicy) []map[stri // node pool updates in `resource_container_cluster` func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Config, nodePoolInfo *NodePoolInformation, prefix, name string, timeout time.Duration) error { - // Nodepool write-lock will be acquired when update function is called. + // Cluster write-lock will be acquired when createOpF is called, and read-lock will be acquired when waitOpF is + // called. + clusterLockKey := nodePoolInfo.clusterLockKey() + // Nodepool write-lock will be acquired when calling creaetOpF and waitOpF. npLockKey := nodePoolInfo.nodePoolLockKey(name) - userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent) + userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent) if err != nil { return err } @@ -2051,17 +2054,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf }, } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2069,8 +2070,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated logging_variant for node pool %s", name) @@ -2080,24 +2081,22 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf if d.HasChange(prefix + "node_config.0.containerd_config") { if _, ok := d.GetOk(prefix + "node_config.0.containerd_config"); ok { req := &container.UpdateNodePoolRequest{ - Name: name, + Name: name, ContainerdConfig: expandContainerdConfig(d.Get(prefix + "node_config.0.containerd_config")), } if req.ContainerdConfig == nil { req.ContainerdConfig = &container.ContainerdConfig{} req.ForceSendFields = []string{"ContainerdConfig"} } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2105,8 +2104,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated containerd_config for node pool %s", name) @@ -2134,17 +2133,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.StoragePools = storagePools } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2152,7 +2149,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } log.Printf("[INFO] Updated disk disk_size_gb/disk_type/machine_type/storage_pools for Node Pool %s", d.Id()) @@ -2190,17 +2187,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.Taints = ntaints } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2208,8 +2203,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated taints for Node Pool %s", d.Id()) } @@ -2242,17 +2237,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.Tags = ntags } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2260,8 +2253,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated tags for node pool %s", name) } @@ -2284,17 +2277,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.ResourceManagerTags = rmTags } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2302,7 +2293,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } log.Printf("[INFO] Updated resource manager tags for node pool %s", name) @@ -2320,17 +2311,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2339,8 +2328,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } // Call update serially. - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated resource labels for node pool %s", name) @@ -2358,17 +2347,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2377,8 +2364,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } // Call update serially. - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated labels for node pool %s", name) @@ -2392,25 +2379,23 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf }, } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.Update(nodePoolInfo.parent(), req) if config.UserProjectOverride { clusterUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterUpdateCall.Do() - if err != nil { - return err - } + return clusterUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated image type in Node Pool %s", d.Id()) } @@ -2425,18 +2410,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.WorkloadMetadataConfig = &container.WorkloadMetadataConfig{} req.ForceSendFields = []string{"WorkloadMetadataConfig"} } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2444,8 +2426,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated workload_metadata_config for node pool %s", name) } @@ -2458,17 +2440,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf Enabled: gcfsEnabled, }, } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2476,8 +2456,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated gcfs_config for node pool %s", name) @@ -2493,17 +2473,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.KubeletConfig = &container.NodeKubeletConfig{} req.ForceSendFields = []string{"KubeletConfig"} } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2511,8 +2489,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated kubelet_config for node pool %s", name) @@ -2527,17 +2505,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.LinuxNodeConfig = &container.LinuxNodeConfig{} req.ForceSendFields = []string{"LinuxNodeConfig"} } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2545,8 +2521,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated linux_node_config for node pool %s", name) @@ -2558,21 +2534,19 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } if v, ok := d.GetOk(prefix + "node_config.0.fast_socket"); ok { fastSocket := v.([]interface{})[0].(map[string]interface{}) - req.FastSocket = &container.FastSocket{ + req.FastSocket = &container.FastSocket{ Enabled: fastSocket["enabled"].(bool), } } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2580,8 +2554,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated fast_socket for node pool %s", name) diff --git a/mmv1/third_party/terraform/services/container/resource_container_node_pool.go.tmpl b/mmv1/third_party/terraform/services/container/resource_container_node_pool.go.tmpl index ccda8717798e..ff207af238ac 100644 --- a/mmv1/third_party/terraform/services/container/resource_container_node_pool.go.tmpl +++ b/mmv1/third_party/terraform/services/container/resource_container_node_pool.go.tmpl @@ -568,11 +568,6 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e return err } - // Acquire read-lock on cluster. - clusterLockKey := nodePoolInfo.clusterLockKey() - transport_tpg.MutexStore.RLock(clusterLockKey) - defer transport_tpg.MutexStore.RUnlock(clusterLockKey) - // Acquire write-lock on nodepool. npLockKey := nodePoolInfo.nodePoolLockKey(nodePool.Name) transport_tpg.MutexStore.Lock(npLockKey) @@ -585,6 +580,9 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e timeout := d.Timeout(schema.TimeoutCreate) startTime := time.Now() + clusterLockKey := nodePoolInfo.clusterLockKey() + transport_tpg.MutexStore.RLock(clusterLockKey) + // we attempt to prefetch the node pool to make sure it doesn't exist before creation var id = fmt.Sprintf("projects/%s/locations/%s/clusters/%s/nodePools/%s", nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, nodePool.Name) name := getNodePoolName(id) @@ -599,11 +597,16 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e // refreshed on the next call to apply. d.SetId(id) } else if err == nil { + transport_tpg.MutexStore.RUnlock(clusterLockKey) return fmt.Errorf("resource - %s - already exists", id) } + transport_tpg.MutexStore.RUnlock(clusterLockKey) var operation *container.Operation err = retry.Retry(timeout, func() *retry.RetryError { + transport_tpg.MutexStore.Lock(clusterLockKey) + defer transport_tpg.MutexStore.Unlock(clusterLockKey) + clusterNodePoolsCreateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Create(nodePoolInfo.parent(), req) if config.UserProjectOverride { clusterNodePoolsCreateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) @@ -622,6 +625,8 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e } return nil }) + transport_tpg.MutexStore.RLock(clusterLockKey) + defer transport_tpg.MutexStore.RUnlock(clusterLockKey) if err != nil { return fmt.Errorf("error creating NodePool: %s", err) } @@ -796,10 +801,7 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e } } - // Acquire read-lock on cluster. clusterLockKey := nodePoolInfo.clusterLockKey() - transport_tpg.MutexStore.RLock(clusterLockKey) - defer transport_tpg.MutexStore.RUnlock(clusterLockKey) // Acquire write-lock on nodepool. npLockKey := nodePoolInfo.nodePoolLockKey(name) @@ -811,6 +813,8 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e var operation *container.Operation err = retry.Retry(timeout, func() *retry.RetryError { + transport_tpg.MutexStore.Lock(clusterLockKey) + defer transport_tpg.MutexStore.Unlock(clusterLockKey) clusterNodePoolsDeleteCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Delete(nodePoolInfo.fullyQualifiedName(name)) if config.UserProjectOverride { clusterNodePoolsDeleteCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) @@ -830,6 +834,8 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e return nil }) + transport_tpg.MutexStore.RLock(clusterLockKey) + defer transport_tpg.MutexStore.RUnlock(clusterLockKey) if err != nil { return fmt.Errorf("Error deleting NodePool: %s", err) @@ -1346,22 +1352,20 @@ func expandNodeNetworkConfig(v interface{}) *container.NodeNetworkConfig { return nnc } - func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *NodePoolInformation, prefix string, timeout time.Duration) error { config := meta.(*transport_tpg.Config) name := d.Get(prefix + "name").(string) - userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent) + userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent) if err != nil { return err } - // Acquire read-lock on cluster. + // Cluster write-lock will be acquired when createOpF is called, and read-lock will be acquired when waitOpF is + // called. clusterLockKey := nodePoolInfo.clusterLockKey() - transport_tpg.MutexStore.RLock(clusterLockKey) - defer transport_tpg.MutexStore.RUnlock(clusterLockKey) - // Nodepool write-lock will be acquired when update function is called. + // Nodepool write-lock will be acquired when calling createOpF and waitOpF. npLockKey := nodePoolInfo.nodePoolLockKey(name) if d.HasChange(prefix + "autoscaling") { @@ -1389,25 +1393,23 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node Update: update, } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.Update(nodePoolInfo.parent(), req) if config.UserProjectOverride { clusterUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterUpdateCall.Do() - if err != nil { - return err - } + return clusterUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated autoscaling in Node Pool %s", d.Id()) } @@ -1421,25 +1423,22 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node req := &container.SetNodePoolSizeRequest{ NodeCount: newSize, } - updateF := func() error { - clusterNodePoolsSetSizeCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.SetSize(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsSetSizeCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.SetSize(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsSetSizeCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsSetSizeCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsSetSizeCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool size", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] GKE node pool %s size has been updated to %d", name, newSize) } @@ -1456,25 +1455,22 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node Management: management, } - updateF := func() error { - clusterNodePoolsSetManagementCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.SetManagement(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsSetManagementCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.SetManagement(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsSetManagementCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsSetManagementCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsSetManagementCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool management", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated management in Node Pool %s", name) } @@ -1484,24 +1480,21 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node NodePoolId: name, NodeVersion: d.Get(prefix + "version").(string), } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool version", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated version in Node Pool %s", name) } @@ -1510,23 +1503,20 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node req := &container.UpdateNodePoolRequest{ Locations: tpgresource.ConvertStringSet(d.Get(prefix + "node_locations").(*schema.Set)), } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool node locations", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated node locations in Node Pool %s", name) } @@ -1574,7 +1564,7 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node if v, ok := blueGreenSettingsConfig["standard_rollout_policy"]; ok && len(v.([]interface{})) > 0 { standardRolloutPolicy := &container.StandardRolloutPolicy{} - if standardRolloutPolicyConfig, ok := v.([]interface{})[0].(map[string]interface{}); ok { + if standardRolloutPolicyConfig, ok := v.([]interface{})[0].(map[string]interface{}); ok { standardRolloutPolicy.BatchSoakDuration = standardRolloutPolicyConfig["batch_soak_duration"].(string) if v, ok := standardRolloutPolicyConfig["batch_node_count"]; ok { standardRolloutPolicy.BatchNodeCount = int64(v.(int)) @@ -1591,44 +1581,38 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node req := &container.UpdateNodePoolRequest{ UpgradeSettings: upgradeSettings, } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool upgrade settings", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated upgrade settings in Node Pool %s", name) } if d.HasChange(prefix + "network_config") { - if d.HasChange(prefix + "network_config.0.enable_private_nodes") || d.HasChange(prefix + "network_config.0.network_performance_config") { + if d.HasChange(prefix+"network_config.0.enable_private_nodes") || d.HasChange(prefix+"network_config.0.network_performance_config") { req := &container.UpdateNodePoolRequest{ - NodePoolId: name, + NodePoolId: name, NodeNetworkConfig: expandNodeNetworkConfig(d.Get(prefix + "network_config")), } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -1636,8 +1620,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated network_config for node pool %s", name) @@ -1688,13 +1672,24 @@ func containerNodePoolAwaitRestingState(config *transport_tpg.Config, name, proj return state, err } -// Retries an operation while the canonical error code is FAILED_PRECONDTION -// or RESOURCE_EXHAUSTED which indicates there is an incompatible operation -// already running on the cluster or there are the number of allowed -// concurrent operations running on the cluster. These errors can be safely -// retried until the incompatible operation completes, and the newly -// requested operation can begin. -func retryWhileIncompatibleOperation(timeout time.Duration, lockKey string, f func() error) error { +// Retries an operation while the canonical error code is FAILED_PRECONDTION or RESOURCE_EXHAUSTED which indicates +// there is an incompatible operation already running on the cluster or there are the number of allowed concurrent +// operations running on the cluster. These errors can be safely retried until the incompatible operation completes, +// and the newly requested operation can begin. +// The cluster lock is held during createOpFunc to make opeation creations sequencial, and cluster read lock is held +// during waitOpFunc to allow concurrency. +func retryWhileIncompatibleOperation(timeout time.Duration, lockKey string, clusterLockKey string, createOpFunc func() (*container.Operation, error), waitOpFunc func(*container.Operation) error) error { + f := func() error { + transport_tpg.MutexStore.Lock(clusterLockKey) + op, err := createOpFunc() + transport_tpg.MutexStore.Unlock(clusterLockKey) + if err != nil { + return err + } + transport_tpg.MutexStore.RLock(clusterLockKey) + defer transport_tpg.MutexStore.RUnlock(clusterLockKey) + return waitOpFunc(op) + } return retry.Retry(timeout, func() *retry.RetryError { if err := transport_tpg.LockedCall(lockKey, f); err != nil { if tpgresource.IsFailedPreconditionError(err) || tpgresource.IsQuotaError(err) {