diff --git a/src/azure-cli/azure/cli/command_modules/acs/tests/latest/test_aks_commands.py b/src/azure-cli/azure/cli/command_modules/acs/tests/latest/test_aks_commands.py index 186fe8b8bcc..5b2e9004651 100644 --- a/src/azure-cli/azure/cli/command_modules/acs/tests/latest/test_aks_commands.py +++ b/src/azure-cli/azure/cli/command_modules/acs/tests/latest/test_aks_commands.py @@ -9139,358 +9139,6 @@ def test_aks_update_with_azurekeyvaultkms_public_key_vault(self, resource_group, self.is_empty(), ]) - @live_only() - @AllowLargeResponse() - @AKSCustomResourceGroupPreparer(random_name_length=17, name_prefix='clitest', location='westcentralus') - def test_aks_create_with_azurekeyvaultkms_private_key_vault(self, resource_group, resource_group_location): - aks_name = self.create_random_name('cliakstest', 16) - kv_name = self.create_random_name('cliakstestkv', 16) - identity_name = self.create_random_name('cliakstestidentity', 24) - self.kwargs.update({ - 'resource_group': resource_group, - 'name': aks_name, - "kv_name": kv_name, - "identity_name": identity_name, - 'ssh_key_value': self.generate_ssh_keys() - }) - - # create user-assigned identity - identity_id = self._get_user_assigned_identity(resource_group) - identity_object_id = self._get_principal_id_of_user_assigned_identity(identity_id) - assert identity_id is not None - assert identity_object_id is not None - self.kwargs.update({ - 'identity_id': identity_id, - 'identity_object_id': identity_object_id, - }) - - # create key vault and key - create_keyvault = 'keyvault create --resource-group={resource_group} --name={kv_name} --enable-rbac-authorization=false --no-self-perms -o json' - kv = self.cmd(create_keyvault, checks=[ - self.check('properties.provisioningState', 'Succeeded') - ]).get_output_in_json() - kv_resource_id = kv['id'] - assert kv_resource_id is not None - self.kwargs.update({ - 'kv_resource_id': kv_resource_id, - }) - - # set access policy for test identity - test_identity_object_id = self._get_test_identity_object_id() - test_identity_access_policy = 'keyvault set-policy --resource-group={resource_group} --name={kv_name} ' \ - '--key-permissions all --object-id ' + test_identity_object_id - self.cmd(test_identity_access_policy, checks=[ - self.check('properties.provisioningState', 'Succeeded') - ]) - - create_key = 'keyvault key create -n kms --vault-name {kv_name} -o json' - key = self.cmd(create_key, checks=[ - self.check('attributes.enabled', True) - ]).get_output_in_json() - key_id_0 = key['key']['kid'] - assert key_id_0 is not None - self.kwargs.update({ - 'key_id': key_id_0, - }) - - # assign access policy - set_policy = 'keyvault set-policy --resource-group={resource_group} --name={kv_name} ' \ - '--object-id {identity_object_id} --key-permissions encrypt decrypt -o json' - policy = self.cmd(set_policy, checks=[ - self.check('properties.provisioningState', 'Succeeded') - ]).get_output_in_json() - - # allow the identity approve private endpoint connection (Microsoft.KeyVault/vaults/privateEndpointConnectionsApproval/action) - create_role_assignment = 'role assignment create --role f25e0fa2-a7c8-4377-a976-54943a77a395 ' \ - '--assignee-object-id {identity_object_id} --assignee-principal-type "ServicePrincipal" ' \ - '--scope {kv_resource_id}' - role_assignment = self.cmd(create_role_assignment).get_output_in_json() - - # disable public network access - disable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Disabled" -o json' - kv = self.cmd(disable_public_network_access, checks=[ - self.check('properties.provisioningState', 'Succeeded') - ]).get_output_in_json() - - create_cmd = 'aks create --resource-group={resource_group} --name={name} ' \ - '--assign-identity {identity_id} ' \ - '--enable-azure-keyvault-kms --azure-keyvault-kms-key-id={key_id} ' \ - '--azure-keyvault-kms-key-vault-network-access=Private --azure-keyvault-kms-key-vault-resource-id {kv_resource_id} ' \ - '--ssh-key-value={ssh_key_value} -o json' - self.cmd(create_cmd, checks=[ - self.check('provisioningState', 'Succeeded'), - self.check('securityProfile.azureKeyVaultKms.enabled', True), - self.check('securityProfile.azureKeyVaultKms.keyId', key_id_0), - self.check('securityProfile.azureKeyVaultKms.keyVaultNetworkAccess', "Private"), - self.check('securityProfile.azureKeyVaultKms.keyVaultResourceId', kv_resource_id) - ]) - - # enable public network access - enable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Enabled" -o json' - kv = self.cmd(enable_public_network_access, checks=[ - self.check('properties.provisioningState', 'Succeeded') - ]).get_output_in_json() - - key = self.cmd(create_key, checks=[ - self.check('attributes.enabled', True) - ]).get_output_in_json() - key_id_1 = key['key']['kid'] - assert key_id_1 is not None - self.kwargs.update({ - 'key_id': key_id_1, - }) - - # disable public network access - disable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Disabled" -o json' - kv = self.cmd(disable_public_network_access, checks=[ - self.check('properties.provisioningState', 'Succeeded') - ]).get_output_in_json() - - # Rotate key - update_cmd = 'aks update --resource-group={resource_group} --name={name} ' \ - '--enable-azure-keyvault-kms --azure-keyvault-kms-key-id={key_id} ' \ - '--azure-keyvault-kms-key-vault-network-access=Private --azure-keyvault-kms-key-vault-resource-id {kv_resource_id} ' \ - '-o json' - self.cmd(update_cmd, checks=[ - self.check('provisioningState', 'Succeeded'), - self.check('securityProfile.azureKeyVaultKms.enabled', True), - self.check('securityProfile.azureKeyVaultKms.keyId', key_id_1), - self.check('securityProfile.azureKeyVaultKms.keyVaultNetworkAccess', "Private"), - self.check('securityProfile.azureKeyVaultKms.keyVaultResourceId', kv_resource_id) - ]) - - # delete - cmd = 'aks delete --resource-group={resource_group} --name={name} --yes --no-wait' - self.cmd(cmd, checks=[ - self.is_empty(), - ]) - - @live_only() - @AllowLargeResponse() - @AKSCustomResourceGroupPreparer(random_name_length=17, name_prefix='clitest', location='westcentralus') - def test_aks_update_with_azurekeyvaultkms_private_key_vault(self, resource_group, resource_group_location): - aks_name = self.create_random_name('cliakstest', 16) - kv_name = self.create_random_name('cliakstestkv', 16) - identity_name = self.create_random_name('cliakstestidentity', 24) - self.kwargs.update({ - 'resource_group': resource_group, - 'name': aks_name, - "kv_name": kv_name, - "identity_name": identity_name, - 'ssh_key_value': self.generate_ssh_keys() - }) - - # create user-assigned identity - identity_id = self._get_user_assigned_identity(resource_group) - identity_object_id = self._get_principal_id_of_user_assigned_identity(identity_id) - assert identity_id is not None - assert identity_object_id is not None - self.kwargs.update({ - 'identity_id': identity_id, - 'identity_object_id': identity_object_id, - }) - - # create key vault and key - create_keyvault = 'keyvault create --resource-group={resource_group} --name={kv_name} --enable-rbac-authorization=false --no-self-perms -o json' - kv = self.cmd(create_keyvault, checks=[ - self.check('properties.provisioningState', 'Succeeded') - ]).get_output_in_json() - kv_resource_id = kv['id'] - assert kv_resource_id is not None - self.kwargs.update({ - 'kv_resource_id': kv_resource_id, - }) - - # set access policy for test identity - test_identity_object_id = self._get_test_identity_object_id() - test_identity_access_policy = 'keyvault set-policy --resource-group={resource_group} --name={kv_name} ' \ - '--key-permissions all --object-id ' + test_identity_object_id - self.cmd(test_identity_access_policy, checks=[ - self.check('properties.provisioningState', 'Succeeded') - ]) - - create_key = 'keyvault key create -n kms --vault-name {kv_name} -o json' - key = self.cmd(create_key, checks=[ - self.check('attributes.enabled', True) - ]).get_output_in_json() - key_id = key['key']['kid'] - assert key_id is not None - self.kwargs.update({ - 'key_id': key_id, - }) - - # assign access policy - set_policy = 'keyvault set-policy --resource-group={resource_group} --name={kv_name} ' \ - '--object-id {identity_object_id} --key-permissions encrypt decrypt -o json' - policy = self.cmd(set_policy, checks=[ - self.check('properties.provisioningState', 'Succeeded') - ]).get_output_in_json() - - # allow the identity approve private endpoint connection (Microsoft.KeyVault/vaults/privateEndpointConnectionsApproval/action) - create_role_assignment = 'role assignment create --role f25e0fa2-a7c8-4377-a976-54943a77a395 ' \ - '--assignee-object-id {identity_object_id} --assignee-principal-type "ServicePrincipal" ' \ - '--scope {kv_resource_id}' - role_assignment = self.cmd(create_role_assignment).get_output_in_json() - - # disable public network access - disable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Disabled" -o json' - kv = self.cmd(disable_public_network_access, checks=[ - self.check('properties.provisioningState', 'Succeeded') - ]).get_output_in_json() - - create_cmd = 'aks create --resource-group={resource_group} --name={name} ' \ - '--assign-identity {identity_id} ' \ - '--ssh-key-value={ssh_key_value} -o json' - self.cmd(create_cmd, checks=[ - self.check('provisioningState', 'Succeeded'), - self.not_exists('securityProfile.azureKeyVaultKms') - ]) - - update_cmd = 'aks update --resource-group={resource_group} --name={name} ' \ - '--enable-azure-keyvault-kms --azure-keyvault-kms-key-id={key_id} ' \ - '--azure-keyvault-kms-key-vault-network-access=Private --azure-keyvault-kms-key-vault-resource-id {kv_resource_id} ' \ - '-o json' - self.cmd(update_cmd, checks=[ - self.check('provisioningState', 'Succeeded'), - self.check('securityProfile.azureKeyVaultKms.enabled', True), - self.check('securityProfile.azureKeyVaultKms.keyId', key_id), - self.check('securityProfile.azureKeyVaultKms.keyVaultNetworkAccess', "Private"), - self.check('securityProfile.azureKeyVaultKms.keyVaultResourceId', kv_resource_id) - ]) - - # delete - cmd = 'aks delete --resource-group={resource_group} --name={name} --yes --no-wait' - self.cmd(cmd, checks=[ - self.is_empty(), - ]) - - @live_only() - @AllowLargeResponse() - @AKSCustomResourceGroupPreparer(random_name_length=17, name_prefix='clitest', location='westcentralus') - def test_aks_create_with_azurekeyvaultkms_private_cluster_v1_private_key_vault(self, resource_group, resource_group_location): - aks_name = self.create_random_name('cliakstest', 16) - kv_name = self.create_random_name('cliakstestkv', 16) - identity_name = self.create_random_name('cliakstestidentity', 24) - self.kwargs.update({ - 'resource_group': resource_group, - 'name': aks_name, - "kv_name": kv_name, - "identity_name": identity_name, - 'ssh_key_value': self.generate_ssh_keys() - }) - - # create user-assigned identity - identity_id = self._get_user_assigned_identity(resource_group) - identity_object_id = self._get_principal_id_of_user_assigned_identity(identity_id) - assert identity_id is not None - assert identity_object_id is not None - self.kwargs.update({ - 'identity_id': identity_id, - 'identity_object_id': identity_object_id, - }) - - # create key vault and key - create_keyvault = 'keyvault create --resource-group={resource_group} --name={kv_name} --enable-rbac-authorization=false --no-self-perms -o json' - kv = self.cmd(create_keyvault, checks=[ - self.check('properties.provisioningState', 'Succeeded') - ]).get_output_in_json() - kv_resource_id = kv['id'] - assert kv_resource_id is not None - self.kwargs.update({ - 'kv_resource_id': kv_resource_id, - }) - - # set access policy for test identity - test_identity_object_id = self._get_test_identity_object_id() - test_identity_access_policy = 'keyvault set-policy --resource-group={resource_group} --name={kv_name} ' \ - '--key-permissions all --object-id ' + test_identity_object_id - self.cmd(test_identity_access_policy, checks=[ - self.check('properties.provisioningState', 'Succeeded') - ]) - - create_key = 'keyvault key create -n kms --vault-name {kv_name} -o json' - key = self.cmd(create_key, checks=[ - self.check('attributes.enabled', True) - ]).get_output_in_json() - key_id_0 = key['key']['kid'] - assert key_id_0 is not None - self.kwargs.update({ - 'key_id': key_id_0, - }) - - # assign access policy - set_policy = 'keyvault set-policy --resource-group={resource_group} --name={kv_name} ' \ - '--object-id {identity_object_id} --key-permissions encrypt decrypt -o json' - policy = self.cmd(set_policy, checks=[ - self.check('properties.provisioningState', 'Succeeded') - ]).get_output_in_json() - - # allow the identity approve private endpoint connection (Microsoft.KeyVault/vaults/privateEndpointConnectionsApproval/action) - create_role_assignment = 'role assignment create --role f25e0fa2-a7c8-4377-a976-54943a77a395 ' \ - '--assignee-object-id {identity_object_id} --assignee-principal-type "ServicePrincipal" ' \ - '--scope {kv_resource_id}' - role_assignment = self.cmd(create_role_assignment).get_output_in_json() - - # disable public network access - disable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Disabled" -o json' - kv = self.cmd(disable_public_network_access, checks=[ - self.check('properties.provisioningState', 'Succeeded') - ]).get_output_in_json() - - create_cmd = 'aks create --resource-group={resource_group} --name={name} ' \ - '--assign-identity {identity_id} --enable-private-cluster ' \ - '--enable-azure-keyvault-kms --azure-keyvault-kms-key-id={key_id} ' \ - '--azure-keyvault-kms-key-vault-network-access=Private --azure-keyvault-kms-key-vault-resource-id {kv_resource_id} ' \ - '--ssh-key-value={ssh_key_value} -o json' - self.cmd(create_cmd, checks=[ - self.check('provisioningState', 'Succeeded'), - self.check('apiServerAccessProfile.enablePrivateCluster', 'True'), - self.check('securityProfile.azureKeyVaultKms.enabled', True), - self.check('securityProfile.azureKeyVaultKms.keyId', key_id_0), - self.check('securityProfile.azureKeyVaultKms.keyVaultNetworkAccess', "Private"), - self.check('securityProfile.azureKeyVaultKms.keyVaultResourceId', kv_resource_id) - ]) - - # enable public network access - enable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Enabled" -o json' - kv = self.cmd(enable_public_network_access, checks=[ - self.check('properties.provisioningState', 'Succeeded') - ]).get_output_in_json() - - key = self.cmd(create_key, checks=[ - self.check('attributes.enabled', True) - ]).get_output_in_json() - key_id_1 = key['key']['kid'] - assert key_id_1 is not None - self.kwargs.update({ - 'key_id': key_id_1, - }) - - # disable public network access - disable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Disabled" -o json' - kv = self.cmd(disable_public_network_access, checks=[ - self.check('properties.provisioningState', 'Succeeded') - ]).get_output_in_json() - - # Rotate key - update_cmd = 'aks update --resource-group={resource_group} --name={name} ' \ - '--enable-azure-keyvault-kms --azure-keyvault-kms-key-id={key_id} ' \ - '--azure-keyvault-kms-key-vault-network-access=Private --azure-keyvault-kms-key-vault-resource-id {kv_resource_id} ' \ - '-o json' - self.cmd(update_cmd, checks=[ - self.check('provisioningState', 'Succeeded'), - self.check('securityProfile.azureKeyVaultKms.enabled', True), - self.check('securityProfile.azureKeyVaultKms.keyId', key_id_1), - self.check('securityProfile.azureKeyVaultKms.keyVaultNetworkAccess', "Private"), - self.check('securityProfile.azureKeyVaultKms.keyVaultResourceId', kv_resource_id) - ]) - - # delete - cmd = 'aks delete --resource-group={resource_group} --name={name} --yes --no-wait' - self.cmd(cmd, checks=[ - self.is_empty(), - ]) - @AllowLargeResponse() @AKSCustomResourceGroupPreparer(random_name_length=17, name_prefix='clitest', location='westus2') def test_aks_create_with_image_cleaner_enabled_with_default_interval_hours(self, resource_group, resource_group_location):