Skip to content

Commit

Permalink
{AKS} Update tests (#29965)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzhecheng authored Oct 8, 2024
1 parent c8abe8b commit d32bd7e
Showing 1 changed file with 0 additions and 352 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9139,358 +9139,6 @@ def test_aks_update_with_azurekeyvaultkms_public_key_vault(self, resource_group,
self.is_empty(),
])

@live_only()
@AllowLargeResponse()
@AKSCustomResourceGroupPreparer(random_name_length=17, name_prefix='clitest', location='westcentralus')
def test_aks_create_with_azurekeyvaultkms_private_key_vault(self, resource_group, resource_group_location):
aks_name = self.create_random_name('cliakstest', 16)
kv_name = self.create_random_name('cliakstestkv', 16)
identity_name = self.create_random_name('cliakstestidentity', 24)
self.kwargs.update({
'resource_group': resource_group,
'name': aks_name,
"kv_name": kv_name,
"identity_name": identity_name,
'ssh_key_value': self.generate_ssh_keys()
})

# create user-assigned identity
identity_id = self._get_user_assigned_identity(resource_group)
identity_object_id = self._get_principal_id_of_user_assigned_identity(identity_id)
assert identity_id is not None
assert identity_object_id is not None
self.kwargs.update({
'identity_id': identity_id,
'identity_object_id': identity_object_id,
})

# create key vault and key
create_keyvault = 'keyvault create --resource-group={resource_group} --name={kv_name} --enable-rbac-authorization=false --no-self-perms -o json'
kv = self.cmd(create_keyvault, checks=[
self.check('properties.provisioningState', 'Succeeded')
]).get_output_in_json()
kv_resource_id = kv['id']
assert kv_resource_id is not None
self.kwargs.update({
'kv_resource_id': kv_resource_id,
})

# set access policy for test identity
test_identity_object_id = self._get_test_identity_object_id()
test_identity_access_policy = 'keyvault set-policy --resource-group={resource_group} --name={kv_name} ' \
'--key-permissions all --object-id ' + test_identity_object_id
self.cmd(test_identity_access_policy, checks=[
self.check('properties.provisioningState', 'Succeeded')
])

create_key = 'keyvault key create -n kms --vault-name {kv_name} -o json'
key = self.cmd(create_key, checks=[
self.check('attributes.enabled', True)
]).get_output_in_json()
key_id_0 = key['key']['kid']
assert key_id_0 is not None
self.kwargs.update({
'key_id': key_id_0,
})

# assign access policy
set_policy = 'keyvault set-policy --resource-group={resource_group} --name={kv_name} ' \
'--object-id {identity_object_id} --key-permissions encrypt decrypt -o json'
policy = self.cmd(set_policy, checks=[
self.check('properties.provisioningState', 'Succeeded')
]).get_output_in_json()

# allow the identity approve private endpoint connection (Microsoft.KeyVault/vaults/privateEndpointConnectionsApproval/action)
create_role_assignment = 'role assignment create --role f25e0fa2-a7c8-4377-a976-54943a77a395 ' \
'--assignee-object-id {identity_object_id} --assignee-principal-type "ServicePrincipal" ' \
'--scope {kv_resource_id}'
role_assignment = self.cmd(create_role_assignment).get_output_in_json()

# disable public network access
disable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Disabled" -o json'
kv = self.cmd(disable_public_network_access, checks=[
self.check('properties.provisioningState', 'Succeeded')
]).get_output_in_json()

create_cmd = 'aks create --resource-group={resource_group} --name={name} ' \
'--assign-identity {identity_id} ' \
'--enable-azure-keyvault-kms --azure-keyvault-kms-key-id={key_id} ' \
'--azure-keyvault-kms-key-vault-network-access=Private --azure-keyvault-kms-key-vault-resource-id {kv_resource_id} ' \
'--ssh-key-value={ssh_key_value} -o json'
self.cmd(create_cmd, checks=[
self.check('provisioningState', 'Succeeded'),
self.check('securityProfile.azureKeyVaultKms.enabled', True),
self.check('securityProfile.azureKeyVaultKms.keyId', key_id_0),
self.check('securityProfile.azureKeyVaultKms.keyVaultNetworkAccess', "Private"),
self.check('securityProfile.azureKeyVaultKms.keyVaultResourceId', kv_resource_id)
])

# enable public network access
enable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Enabled" -o json'
kv = self.cmd(enable_public_network_access, checks=[
self.check('properties.provisioningState', 'Succeeded')
]).get_output_in_json()

key = self.cmd(create_key, checks=[
self.check('attributes.enabled', True)
]).get_output_in_json()
key_id_1 = key['key']['kid']
assert key_id_1 is not None
self.kwargs.update({
'key_id': key_id_1,
})

# disable public network access
disable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Disabled" -o json'
kv = self.cmd(disable_public_network_access, checks=[
self.check('properties.provisioningState', 'Succeeded')
]).get_output_in_json()

# Rotate key
update_cmd = 'aks update --resource-group={resource_group} --name={name} ' \
'--enable-azure-keyvault-kms --azure-keyvault-kms-key-id={key_id} ' \
'--azure-keyvault-kms-key-vault-network-access=Private --azure-keyvault-kms-key-vault-resource-id {kv_resource_id} ' \
'-o json'
self.cmd(update_cmd, checks=[
self.check('provisioningState', 'Succeeded'),
self.check('securityProfile.azureKeyVaultKms.enabled', True),
self.check('securityProfile.azureKeyVaultKms.keyId', key_id_1),
self.check('securityProfile.azureKeyVaultKms.keyVaultNetworkAccess', "Private"),
self.check('securityProfile.azureKeyVaultKms.keyVaultResourceId', kv_resource_id)
])

# delete
cmd = 'aks delete --resource-group={resource_group} --name={name} --yes --no-wait'
self.cmd(cmd, checks=[
self.is_empty(),
])

@live_only()
@AllowLargeResponse()
@AKSCustomResourceGroupPreparer(random_name_length=17, name_prefix='clitest', location='westcentralus')
def test_aks_update_with_azurekeyvaultkms_private_key_vault(self, resource_group, resource_group_location):
aks_name = self.create_random_name('cliakstest', 16)
kv_name = self.create_random_name('cliakstestkv', 16)
identity_name = self.create_random_name('cliakstestidentity', 24)
self.kwargs.update({
'resource_group': resource_group,
'name': aks_name,
"kv_name": kv_name,
"identity_name": identity_name,
'ssh_key_value': self.generate_ssh_keys()
})

# create user-assigned identity
identity_id = self._get_user_assigned_identity(resource_group)
identity_object_id = self._get_principal_id_of_user_assigned_identity(identity_id)
assert identity_id is not None
assert identity_object_id is not None
self.kwargs.update({
'identity_id': identity_id,
'identity_object_id': identity_object_id,
})

# create key vault and key
create_keyvault = 'keyvault create --resource-group={resource_group} --name={kv_name} --enable-rbac-authorization=false --no-self-perms -o json'
kv = self.cmd(create_keyvault, checks=[
self.check('properties.provisioningState', 'Succeeded')
]).get_output_in_json()
kv_resource_id = kv['id']
assert kv_resource_id is not None
self.kwargs.update({
'kv_resource_id': kv_resource_id,
})

# set access policy for test identity
test_identity_object_id = self._get_test_identity_object_id()
test_identity_access_policy = 'keyvault set-policy --resource-group={resource_group} --name={kv_name} ' \
'--key-permissions all --object-id ' + test_identity_object_id
self.cmd(test_identity_access_policy, checks=[
self.check('properties.provisioningState', 'Succeeded')
])

create_key = 'keyvault key create -n kms --vault-name {kv_name} -o json'
key = self.cmd(create_key, checks=[
self.check('attributes.enabled', True)
]).get_output_in_json()
key_id = key['key']['kid']
assert key_id is not None
self.kwargs.update({
'key_id': key_id,
})

# assign access policy
set_policy = 'keyvault set-policy --resource-group={resource_group} --name={kv_name} ' \
'--object-id {identity_object_id} --key-permissions encrypt decrypt -o json'
policy = self.cmd(set_policy, checks=[
self.check('properties.provisioningState', 'Succeeded')
]).get_output_in_json()

# allow the identity approve private endpoint connection (Microsoft.KeyVault/vaults/privateEndpointConnectionsApproval/action)
create_role_assignment = 'role assignment create --role f25e0fa2-a7c8-4377-a976-54943a77a395 ' \
'--assignee-object-id {identity_object_id} --assignee-principal-type "ServicePrincipal" ' \
'--scope {kv_resource_id}'
role_assignment = self.cmd(create_role_assignment).get_output_in_json()

# disable public network access
disable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Disabled" -o json'
kv = self.cmd(disable_public_network_access, checks=[
self.check('properties.provisioningState', 'Succeeded')
]).get_output_in_json()

create_cmd = 'aks create --resource-group={resource_group} --name={name} ' \
'--assign-identity {identity_id} ' \
'--ssh-key-value={ssh_key_value} -o json'
self.cmd(create_cmd, checks=[
self.check('provisioningState', 'Succeeded'),
self.not_exists('securityProfile.azureKeyVaultKms')
])

update_cmd = 'aks update --resource-group={resource_group} --name={name} ' \
'--enable-azure-keyvault-kms --azure-keyvault-kms-key-id={key_id} ' \
'--azure-keyvault-kms-key-vault-network-access=Private --azure-keyvault-kms-key-vault-resource-id {kv_resource_id} ' \
'-o json'
self.cmd(update_cmd, checks=[
self.check('provisioningState', 'Succeeded'),
self.check('securityProfile.azureKeyVaultKms.enabled', True),
self.check('securityProfile.azureKeyVaultKms.keyId', key_id),
self.check('securityProfile.azureKeyVaultKms.keyVaultNetworkAccess', "Private"),
self.check('securityProfile.azureKeyVaultKms.keyVaultResourceId', kv_resource_id)
])

# delete
cmd = 'aks delete --resource-group={resource_group} --name={name} --yes --no-wait'
self.cmd(cmd, checks=[
self.is_empty(),
])

@live_only()
@AllowLargeResponse()
@AKSCustomResourceGroupPreparer(random_name_length=17, name_prefix='clitest', location='westcentralus')
def test_aks_create_with_azurekeyvaultkms_private_cluster_v1_private_key_vault(self, resource_group, resource_group_location):
aks_name = self.create_random_name('cliakstest', 16)
kv_name = self.create_random_name('cliakstestkv', 16)
identity_name = self.create_random_name('cliakstestidentity', 24)
self.kwargs.update({
'resource_group': resource_group,
'name': aks_name,
"kv_name": kv_name,
"identity_name": identity_name,
'ssh_key_value': self.generate_ssh_keys()
})

# create user-assigned identity
identity_id = self._get_user_assigned_identity(resource_group)
identity_object_id = self._get_principal_id_of_user_assigned_identity(identity_id)
assert identity_id is not None
assert identity_object_id is not None
self.kwargs.update({
'identity_id': identity_id,
'identity_object_id': identity_object_id,
})

# create key vault and key
create_keyvault = 'keyvault create --resource-group={resource_group} --name={kv_name} --enable-rbac-authorization=false --no-self-perms -o json'
kv = self.cmd(create_keyvault, checks=[
self.check('properties.provisioningState', 'Succeeded')
]).get_output_in_json()
kv_resource_id = kv['id']
assert kv_resource_id is not None
self.kwargs.update({
'kv_resource_id': kv_resource_id,
})

# set access policy for test identity
test_identity_object_id = self._get_test_identity_object_id()
test_identity_access_policy = 'keyvault set-policy --resource-group={resource_group} --name={kv_name} ' \
'--key-permissions all --object-id ' + test_identity_object_id
self.cmd(test_identity_access_policy, checks=[
self.check('properties.provisioningState', 'Succeeded')
])

create_key = 'keyvault key create -n kms --vault-name {kv_name} -o json'
key = self.cmd(create_key, checks=[
self.check('attributes.enabled', True)
]).get_output_in_json()
key_id_0 = key['key']['kid']
assert key_id_0 is not None
self.kwargs.update({
'key_id': key_id_0,
})

# assign access policy
set_policy = 'keyvault set-policy --resource-group={resource_group} --name={kv_name} ' \
'--object-id {identity_object_id} --key-permissions encrypt decrypt -o json'
policy = self.cmd(set_policy, checks=[
self.check('properties.provisioningState', 'Succeeded')
]).get_output_in_json()

# allow the identity approve private endpoint connection (Microsoft.KeyVault/vaults/privateEndpointConnectionsApproval/action)
create_role_assignment = 'role assignment create --role f25e0fa2-a7c8-4377-a976-54943a77a395 ' \
'--assignee-object-id {identity_object_id} --assignee-principal-type "ServicePrincipal" ' \
'--scope {kv_resource_id}'
role_assignment = self.cmd(create_role_assignment).get_output_in_json()

# disable public network access
disable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Disabled" -o json'
kv = self.cmd(disable_public_network_access, checks=[
self.check('properties.provisioningState', 'Succeeded')
]).get_output_in_json()

create_cmd = 'aks create --resource-group={resource_group} --name={name} ' \
'--assign-identity {identity_id} --enable-private-cluster ' \
'--enable-azure-keyvault-kms --azure-keyvault-kms-key-id={key_id} ' \
'--azure-keyvault-kms-key-vault-network-access=Private --azure-keyvault-kms-key-vault-resource-id {kv_resource_id} ' \
'--ssh-key-value={ssh_key_value} -o json'
self.cmd(create_cmd, checks=[
self.check('provisioningState', 'Succeeded'),
self.check('apiServerAccessProfile.enablePrivateCluster', 'True'),
self.check('securityProfile.azureKeyVaultKms.enabled', True),
self.check('securityProfile.azureKeyVaultKms.keyId', key_id_0),
self.check('securityProfile.azureKeyVaultKms.keyVaultNetworkAccess', "Private"),
self.check('securityProfile.azureKeyVaultKms.keyVaultResourceId', kv_resource_id)
])

# enable public network access
enable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Enabled" -o json'
kv = self.cmd(enable_public_network_access, checks=[
self.check('properties.provisioningState', 'Succeeded')
]).get_output_in_json()

key = self.cmd(create_key, checks=[
self.check('attributes.enabled', True)
]).get_output_in_json()
key_id_1 = key['key']['kid']
assert key_id_1 is not None
self.kwargs.update({
'key_id': key_id_1,
})

# disable public network access
disable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Disabled" -o json'
kv = self.cmd(disable_public_network_access, checks=[
self.check('properties.provisioningState', 'Succeeded')
]).get_output_in_json()

# Rotate key
update_cmd = 'aks update --resource-group={resource_group} --name={name} ' \
'--enable-azure-keyvault-kms --azure-keyvault-kms-key-id={key_id} ' \
'--azure-keyvault-kms-key-vault-network-access=Private --azure-keyvault-kms-key-vault-resource-id {kv_resource_id} ' \
'-o json'
self.cmd(update_cmd, checks=[
self.check('provisioningState', 'Succeeded'),
self.check('securityProfile.azureKeyVaultKms.enabled', True),
self.check('securityProfile.azureKeyVaultKms.keyId', key_id_1),
self.check('securityProfile.azureKeyVaultKms.keyVaultNetworkAccess', "Private"),
self.check('securityProfile.azureKeyVaultKms.keyVaultResourceId', kv_resource_id)
])

# delete
cmd = 'aks delete --resource-group={resource_group} --name={name} --yes --no-wait'
self.cmd(cmd, checks=[
self.is_empty(),
])

@AllowLargeResponse()
@AKSCustomResourceGroupPreparer(random_name_length=17, name_prefix='clitest', location='westus2')
def test_aks_create_with_image_cleaner_enabled_with_default_interval_hours(self, resource_group, resource_group_location):
Expand Down

0 comments on commit d32bd7e

Please sign in to comment.