From f9d34c75cfe31cc9f753cbe6aa46c9c83c41133a Mon Sep 17 00:00:00 2001 From: Chin-Ya Huang Date: Thu, 16 Nov 2023 15:57:06 +0800 Subject: [PATCH] refactor(negative): adapt libs to keywords longhorn/longhorn-7034 Signed-off-by: Chin-Ya Huang --- e2e/libs/host/__init__.py | 1 + e2e/libs/host/constant.py | 1 + e2e/libs/host/host.py | 77 ++++++++++ e2e/libs/instancemanager/__init__.py | 1 + e2e/libs/instancemanager/instancemanager.py | 35 +++++ e2e/libs/instancemanager/utility.py | 29 ---- e2e/libs/keywords/common_keywords.py | 4 + e2e/libs/keywords/deployment_keywords.py | 40 +++-- e2e/libs/keywords/host_keywords.py | 52 +++++++ e2e/libs/keywords/instancemanager_keywords.py | 13 ++ e2e/libs/keywords/kubelet_keywords.py | 7 +- e2e/libs/keywords/network_keywords.py | 5 + e2e/libs/keywords/node_keywords.py | 38 ----- .../persistentvolumeclaim_keywords.py | 28 +++- e2e/libs/keywords/recurringjob_keywords.py | 24 ++- e2e/libs/keywords/statefulset_keywords.py | 57 +++++-- e2e/libs/keywords/stress_keywords.py | 46 +++++- e2e/libs/keywords/volume_keywords.py | 129 ++++++++++++---- e2e/libs/keywords/workload_keywords.py | 130 ++++++++++++---- e2e/libs/kubelet/kubelet.py | 9 +- e2e/libs/network/network.py | 8 +- e2e/libs/node/node.py | 126 ++++++++-------- e2e/libs/node/stress.py | 14 +- e2e/libs/node/utility.py | 72 +-------- e2e/libs/node_exec/node_exec.py | 7 +- e2e/libs/persistentvolumeclaim/crd.py | 41 +++++- .../persistentvolumeclaim.py | 77 +++++++++- e2e/libs/recurringjob/base.py | 4 - e2e/libs/recurringjob/crd.py | 25 +++- e2e/libs/recurringjob/recurringjob.py | 12 +- e2e/libs/recurringjob/rest.py | 32 ++-- e2e/libs/replica/base.py | 8 +- e2e/libs/replica/crd.py | 6 +- e2e/libs/replica/replica.py | 2 +- e2e/libs/replica/rest.py | 9 +- e2e/libs/utility/constant.py | 10 ++ e2e/libs/utility/utility.py | 19 ++- e2e/libs/volume/base.py | 6 +- e2e/libs/volume/crd.py | 120 +++++++++------ e2e/libs/volume/rest.py | 7 +- e2e/libs/volume/volume.py | 24 ++- e2e/libs/workload/constant.py | 2 +- e2e/libs/workload/deployment.py | 30 ++-- e2e/libs/workload/persistentvolumeclaim.py | 55 ------- e2e/libs/workload/pod.py | 18 ++- e2e/libs/workload/statefulset.py | 57 +++++-- e2e/libs/workload/workload.py | 139 ++++++++++++------ e2e/templates/workload/pvc.yaml | 2 + e2e/templates/workload/statefulset.yaml | 2 + 49 files changed, 1098 insertions(+), 562 deletions(-) create mode 100644 e2e/libs/host/__init__.py create mode 100644 e2e/libs/host/constant.py create mode 100644 e2e/libs/host/host.py create mode 100644 e2e/libs/instancemanager/instancemanager.py delete mode 100644 e2e/libs/instancemanager/utility.py create mode 100644 e2e/libs/keywords/host_keywords.py create mode 100644 e2e/libs/keywords/instancemanager_keywords.py delete mode 100644 e2e/libs/keywords/node_keywords.py create mode 100644 e2e/libs/utility/constant.py delete mode 100644 e2e/libs/workload/persistentvolumeclaim.py diff --git a/e2e/libs/host/__init__.py b/e2e/libs/host/__init__.py new file mode 100644 index 0000000000..49b0c583bd --- /dev/null +++ b/e2e/libs/host/__init__.py @@ -0,0 +1 @@ +from host.host import Host diff --git a/e2e/libs/host/constant.py b/e2e/libs/host/constant.py new file mode 100644 index 0000000000..6a00688011 --- /dev/null +++ b/e2e/libs/host/constant.py @@ -0,0 +1 @@ +NODE_REBOOT_DOWN_TIME_SECOND = 60 diff --git a/e2e/libs/host/host.py b/e2e/libs/host/host.py new file mode 100644 index 0000000000..3c7d521795 --- /dev/null +++ b/e2e/libs/host/host.py @@ -0,0 +1,77 @@ +import boto3 +import time +import yaml + +from host.constant import NODE_REBOOT_DOWN_TIME_SECOND + +from node.node import Node + +from utility.utility import logging +from utility.utility import wait_for_cluster_ready + + +class Host: + + def __init__(self): + with open('/tmp/instance_mapping', 'r') as f: + self.mapping = yaml.safe_load(f) + self.aws_client = boto3.client('ec2') + + self.node = Node() + + def reboot_all_nodes(self, shut_down_time_in_sec=NODE_REBOOT_DOWN_TIME_SECOND): + instance_ids = [value for value in self.mapping.values()] + + resp = self.aws_client.stop_instances(InstanceIds=instance_ids, Force=True) + assert resp['ResponseMetadata']['HTTPStatusCode'] == 200, f"Failed to stop instances {instance_ids} response: {resp}" + logging(f"Stopping instances {instance_ids}") + waiter = self.aws_client.get_waiter('instance_stopped') + waiter.wait(InstanceIds=instance_ids) + + logging(f"Wait for {shut_down_time_in_sec} seconds before starting instances") + time.sleep(shut_down_time_in_sec) + + resp = self.aws_client.start_instances(InstanceIds=instance_ids) + logging(f"Starting instances {instance_ids} response: {resp}") + waiter = self.aws_client.get_waiter('instance_running') + waiter.wait(InstanceIds=instance_ids) + + wait_for_cluster_ready() + + logging(f"Started instances") + + def reboot_node(self, reboot_node_name, shut_down_time_in_sec=NODE_REBOOT_DOWN_TIME_SECOND): + instance_ids = [self.mapping[reboot_node_name]] + + resp = self.aws_client.stop_instances(InstanceIds=instance_ids, Force=True) + assert resp['ResponseMetadata']['HTTPStatusCode'] == 200, f"Failed to stop instances {instance_ids} response: {resp}" + logging(f"Stopping instances {instance_ids}") + waiter = self.aws_client.get_waiter('instance_stopped') + waiter.wait(InstanceIds=instance_ids) + logging(f"Stopped instances") + + time.sleep(shut_down_time_in_sec) + + resp = self.aws_client.start_instances(InstanceIds=instance_ids) + logging(f"Starting instances {instance_ids} response: {resp}") + waiter = self.aws_client.get_waiter('instance_running') + waiter.wait(InstanceIds=instance_ids) + logging(f"Started instances") + + def reboot_all_worker_nodes(self, shut_down_time_in_sec=NODE_REBOOT_DOWN_TIME_SECOND): + instance_ids = [self.mapping[value] for value in self.node.list_node_names_by_role("worker")] + + resp = self.aws_client.stop_instances(InstanceIds=instance_ids, Force=True) + assert resp['ResponseMetadata']['HTTPStatusCode'] == 200, f"Failed to stop instances {instance_ids} response: {resp}" + logging(f"Stopping instances {instance_ids}") + waiter = self.aws_client.get_waiter('instance_stopped') + waiter.wait(InstanceIds=instance_ids) + logging(f"Stopped instances") + + time.sleep(shut_down_time_in_sec) + + resp = self.aws_client.start_instances(InstanceIds=instance_ids) + logging(f"Starting instances {instance_ids} response: {resp}") + waiter = self.aws_client.get_waiter('instance_running') + waiter.wait(InstanceIds=instance_ids) + logging(f"Started instances") diff --git a/e2e/libs/instancemanager/__init__.py b/e2e/libs/instancemanager/__init__.py index e69de29bb2..dfda5a67de 100644 --- a/e2e/libs/instancemanager/__init__.py +++ b/e2e/libs/instancemanager/__init__.py @@ -0,0 +1 @@ +from instancemanager.instancemanager import InstanceManager diff --git a/e2e/libs/instancemanager/instancemanager.py b/e2e/libs/instancemanager/instancemanager.py new file mode 100644 index 0000000000..986de2c82c --- /dev/null +++ b/e2e/libs/instancemanager/instancemanager.py @@ -0,0 +1,35 @@ +import time + +from node import Node + +from utility.utility import get_longhorn_client +from utility.utility import get_retry_count_and_interval +from utility.utility import logging + + +class InstanceManager: + + def __init__(self): + self.node = Node() + + def wait_for_all_instance_manager_running(self): + longhorn_client = get_longhorn_client() + worker_nodes = self.node.list_node_names_by_role("worker") + + retry_count, retry_interval = get_retry_count_and_interval() + for i in range(retry_count): + try: + instance_managers = longhorn_client.list_instance_manager() + instance_manager_map = {} + for im in instance_managers: + if im.currentState == "running": + instance_manager_map[im.nodeID] = im + if len(instance_manager_map) == len(worker_nodes): + break + except Exception as e: + logging(f"Getting instance manager state error: {e}") + + logging(f"Waiting for all instance manager running, retry ({i}) ...") + time.sleep(retry_interval) + + assert len(instance_manager_map) == len(worker_nodes), f"expect all instance managers running, instance_managers = {instance_managers}, instance_manager_map = {instance_manager_map}" diff --git a/e2e/libs/instancemanager/utility.py b/e2e/libs/instancemanager/utility.py deleted file mode 100644 index d84e4c0099..0000000000 --- a/e2e/libs/instancemanager/utility.py +++ /dev/null @@ -1,29 +0,0 @@ -import time - -from node.utility import list_node_names_by_role - -from utility.utility import get_longhorn_client -from utility.utility import get_retry_count_and_interval -from utility.utility import logging - -def wait_for_all_instance_manager_running(): - longhorn_client = get_longhorn_client() - worker_nodes = list_node_names_by_role("worker") - - retry_count, retry_interval = get_retry_count_and_interval() - for i in range(retry_count): - try: - instance_managers = longhorn_client.list_instance_manager() - instance_manager_map = {} - for im in instance_managers: - if im.currentState == "running": - instance_manager_map[im.nodeID] = im - if len(instance_manager_map) == len(worker_nodes): - break - except Exception as e: - logging(f"Getting instance manager state error: {e}") - - logging(f"Waiting for all instance manager running, retry ({i}) ...") - time.sleep(retry_interval) - - assert len(instance_manager_map) == len(worker_nodes), f"expect all instance managers running, instance_managers = {instance_managers}, instance_manager_map = {instance_manager_map}" diff --git a/e2e/libs/keywords/common_keywords.py b/e2e/libs/keywords/common_keywords.py index 2dafabd997..0a64d102a9 100644 --- a/e2e/libs/keywords/common_keywords.py +++ b/e2e/libs/keywords/common_keywords.py @@ -1,6 +1,7 @@ from node_exec import NodeExec from utility.utility import init_k8s_api_client +from utility.utility import generate_name_with_suffix class common_keywords: @@ -17,3 +18,6 @@ def init_node_exec(self, test_name): def cleanup_node_exec(self): NodeExec.get_instance().cleanup() + + def generate_name_with_suffix(self, kind, suffix): + return generate_name_with_suffix(kind, suffix) diff --git a/e2e/libs/keywords/deployment_keywords.py b/e2e/libs/keywords/deployment_keywords.py index 046c26d064..841cab3de1 100644 --- a/e2e/libs/keywords/deployment_keywords.py +++ b/e2e/libs/keywords/deployment_keywords.py @@ -1,22 +1,32 @@ +from utility.constant import LABEL_TEST +from utility.constant import LABEL_TEST_VALUE +from utility.utility import logging + +from volume import Volume + from workload.deployment import create_deployment from workload.deployment import delete_deployment -from workload.persistentvolumeclaim import create_persistentvolumeclaim -from workload.persistentvolumeclaim import delete_persistentvolumeclaim -from workload.workload import get_workload_pvc_name +from workload.deployment import list_deployments class deployment_keywords: def __init__(self): - pass - - def cleanup_deployments(self, deployment_names): - for name in deployment_names: - pvc_name = get_workload_pvc_name(name) - delete_deployment(name) - delete_persistentvolumeclaim(pvc_name) - - def create_deployment(self, volume_type="rwo", option=""): - create_persistentvolumeclaim(volume_type, option) - deployment_name = create_deployment(volume_type, option) - return deployment_name + self.volume = Volume() + + def cleanup_deployments(self): + deployments = list_deployments( + label_selector=f"{LABEL_TEST}={LABEL_TEST_VALUE}" + ) + + logging(f'Cleaning up {len(deployments.items)} deployments') + for deployment in deployments.items: + self.delete_deployment(deployment.metadata.name) + + def create_deployment(self, name, claim_name): + logging(f'Creating deployment {name}') + create_deployment(name, claim_name) + + def delete_deployment(self, name): + logging(f'Deleting deployment {name}') + delete_deployment(name) diff --git a/e2e/libs/keywords/host_keywords.py b/e2e/libs/keywords/host_keywords.py new file mode 100644 index 0000000000..9387d9249a --- /dev/null +++ b/e2e/libs/keywords/host_keywords.py @@ -0,0 +1,52 @@ +from robot.libraries.BuiltIn import BuiltIn + +from host import Host +from host.constant import NODE_REBOOT_DOWN_TIME_SECOND + +from node import Node + +from utility.utility import logging + + +class host_keywords: + + def __init__(self): + self.volume_keywords = BuiltIn().get_library_instance('volume_keywords') + + self.host = Host() + self.node = Node() + + def reboot_volume_node(self, volume_name): + node_id = self.volume_keywords.get_node_id_by_replica_locality(volume_name, "volume node") + + logging(f'Rebooting volume {volume_name} node {node_id} with downtime {NODE_REBOOT_DOWN_TIME_SECOND} seconds') + self.host.reboot_node(node_id) + + def reboot_replica_node(self, volume_name): + node_id = self.volume_keywords.get_node_id_by_replica_locality(volume_name, "replica node") + + logging(f'Rebooting volume {volume_name} node {node_id} with downtime {NODE_REBOOT_DOWN_TIME_SECOND} seconds') + self.host.reboot_node(node_id) + + def reboot_node_by_index(self, idx, power_off_time_in_min=1): + node_name = self.node.get_node_by_index(idx) + reboot_down_time_min = int(power_off_time_in_min) * 60 + + logging(f'Rebooting node {node_name} with downtime {reboot_down_time_min} minutes') + self.host.reboot_node(node_name, reboot_down_time_min) + + def reboot_all_worker_nodes(self, power_off_time_in_min=1): + reboot_down_time_min = int(power_off_time_in_min) * 60 + + logging(f'Rebooting all worker nodes with downtime {reboot_down_time_min} minutes') + self.host.reboot_all_worker_nodes(reboot_down_time_min) + + def reboot_all_nodes(self): + logging(f'Rebooting all nodes with downtime {NODE_REBOOT_DOWN_TIME_SECOND} seconds') + self.host.reboot_all_nodes() + + def reboot_node_by_name(self, node_name, downtime_in_min=1): + reboot_down_time_min = int(downtime_in_min) * 60 + + logging(f'Rebooting node {node_name} with downtime {reboot_down_time_min} minutes') + self.host.reboot_node(node_name, reboot_down_time_min) diff --git a/e2e/libs/keywords/instancemanager_keywords.py b/e2e/libs/keywords/instancemanager_keywords.py new file mode 100644 index 0000000000..692cd6a5b0 --- /dev/null +++ b/e2e/libs/keywords/instancemanager_keywords.py @@ -0,0 +1,13 @@ +from instancemanager import InstanceManager + +from utility.utility import logging + + +class instancemanager_keywords: + + def __init__(self): + self.instancemanager = InstanceManager() + + def wait_for_all_instance_manager_running(self): + logging(f'Waiting for all instance manager running') + self.instancemanager.wait_for_all_instance_manager_running() diff --git a/e2e/libs/keywords/kubelet_keywords.py b/e2e/libs/keywords/kubelet_keywords.py index 58f33dec59..d42357418d 100644 --- a/e2e/libs/keywords/kubelet_keywords.py +++ b/e2e/libs/keywords/kubelet_keywords.py @@ -1,7 +1,10 @@ from kubelet.kubelet import restart_kubelet +from utility.utility import logging + class kubelet_keywords: - def restart_kubelet(self, node_name, stop_time_in_sec): - restart_kubelet(node_name, int(stop_time_in_sec)) + def restart_kubelet(self, node_name, downtime_in_sec): + logging(f'Restarting kubelet on node {node_name} with downtime {downtime_in_sec} seconds') + restart_kubelet(node_name, int(downtime_in_sec)) diff --git a/e2e/libs/keywords/network_keywords.py b/e2e/libs/keywords/network_keywords.py index 93a86797d5..63f03cecc8 100644 --- a/e2e/libs/keywords/network_keywords.py +++ b/e2e/libs/keywords/network_keywords.py @@ -1,10 +1,15 @@ from network.network import setup_control_plane_network_latency from network.network import cleanup_control_plane_network_latency +from utility.utility import logging + + class network_keywords: def setup_control_plane_network_latency(self): + logging(f"Setting up control plane network latency") setup_control_plane_network_latency() def cleanup_control_plane_network_latency(self): + logging(f"Cleaning up control plane network latency") cleanup_control_plane_network_latency() diff --git a/e2e/libs/keywords/node_keywords.py b/e2e/libs/keywords/node_keywords.py deleted file mode 100644 index 82c5e43ff2..0000000000 --- a/e2e/libs/keywords/node_keywords.py +++ /dev/null @@ -1,38 +0,0 @@ -from robot.libraries.BuiltIn import BuiltIn - -from instancemanager.utility import wait_for_all_instance_manager_running - -from node import Node -from node.utility import get_node_by_index - - -class node_keywords: - - def __init__(self): - self.node = Node() - - def reboot_volume_node(self, volume_name): - volume_keywords = BuiltIn().get_library_instance('volume_keywords') - volume_node = volume_keywords.get_replica_node_attached_to_volume(volume_name) - self.node.reboot_node(volume_node) - - def reboot_replica_node(self, volume_name): - volume_keywords = BuiltIn().get_library_instance('volume_keywords') - replica_node = volume_keywords.get_replica_node_not_attached_to_volume(volume_name) - self.node.reboot_node(replica_node) - - def reboot_node_by_index(self, idx, power_off_time_in_min=1): - node_name = get_node_by_index(idx) - self.node.reboot_node(node_name, int(power_off_time_in_min) * 60) - - def reboot_all_worker_nodes(self, power_off_time_in_min=1): - self.node.reboot_all_worker_nodes(int(power_off_time_in_min) * 60) - - def reboot_all_nodes(self): - self.node.reboot_all_nodes() - - def reboot_node_by_name(self, node_name, power_off_time_in_min=1): - self.node.reboot_node(node_name, int(power_off_time_in_min) * 60) - - def wait_for_all_instance_manager_running(self): - wait_for_all_instance_manager_running() diff --git a/e2e/libs/keywords/persistentvolumeclaim_keywords.py b/e2e/libs/keywords/persistentvolumeclaim_keywords.py index f995c7930d..1eab8097c3 100644 --- a/e2e/libs/keywords/persistentvolumeclaim_keywords.py +++ b/e2e/libs/keywords/persistentvolumeclaim_keywords.py @@ -1,5 +1,8 @@ from persistentvolumeclaim import PersistentVolumeClaim +from utility.constant import ANNOT_EXPANDED_SIZE +from utility.constant import LABEL_TEST +from utility.constant import LABEL_TEST_VALUE from utility.utility import logging from volume.constant import MEBIBYTE @@ -8,9 +11,26 @@ class persistentvolumeclaim_keywords: def __init__(self): - self.pvc = PersistentVolumeClaim() + self.claim = PersistentVolumeClaim() - def expand_pvc_size_by_mib(self, claim_name, size_in_mib): - logging(f'Expanding PVC {claim_name} by {size_in_mib} MiB') + def cleanup_persistentvolumeclaims(self): + claims = self.claim.list(label_selector=f"{LABEL_TEST}={LABEL_TEST_VALUE}") + + logging(f'Cleaning up {len(claims.items)} persistentvolumeclaims') + for claim in claims.items: + self.delete_persistentvolumeclaim(claim.metadata.name) + + def create_persistentvolumeclaim(self, name, volume_type="RWO", option=""): + logging(f'Creating persistentvolumeclaim {name}') + return self.claim.create(name, volume_type, option) + + def delete_persistentvolumeclaim(self, name): + logging(f'Deleting persistentvolumeclaim {name}') + return self.claim.delete(name) + + def expand_persistentvolumeclaim_size_by_mib(self, claim_name, size_in_mib): size_in_byte = int(size_in_mib) * MEBIBYTE - return self.pvc.expand(claim_name, size_in_byte) + expanded_size = self.claim.expand(claim_name, size_in_byte) + + logging(f'Expanding persistentvolumeclaim {claim_name} by {size_in_mib} MiB') + self.claim.set_annotation(claim_name, ANNOT_EXPANDED_SIZE, str(expanded_size)) diff --git a/e2e/libs/keywords/recurringjob_keywords.py b/e2e/libs/keywords/recurringjob_keywords.py index 8e91bc86c3..06bc220f2f 100644 --- a/e2e/libs/keywords/recurringjob_keywords.py +++ b/e2e/libs/keywords/recurringjob_keywords.py @@ -1,5 +1,7 @@ from recurringjob import RecurringJob +from utility.constant import LABEL_TEST +from utility.constant import LABEL_TEST_VALUE from utility.utility import logging @@ -8,22 +10,30 @@ class recurringjob_keywords: def __init__(self): self.recurringjob = RecurringJob() + def cleanup_recurringjobs(self): + recurringjobs = self.recurringjob.list( + label_selector=f"{LABEL_TEST}={LABEL_TEST_VALUE}" + ) + + logging(f'Cleaning up {len(recurringjobs["items"])} recurringjobs') + for recurringjob in recurringjobs['items']: + self.recurringjob.delete(recurringjob['metadata']['name']) + def create_snapshot_recurringjob_for_volume(self, volume_name): job_name = volume_name + '-snap' + + logging(f'Creating snapshot recurringjob {job_name} for volume {volume_name}') self.recurringjob.create(job_name, task="snapshot") self.recurringjob.add_to_volume(job_name, volume_name) - self.recurringjob.get(job_name) - logging(f'Created recurringjob {job_name} for volume {volume_name}') + def create_backup_recurringjob_for_volume(self, volume_name): job_name = volume_name + '-bak' + + logging(f'Creating backup recurringjob {job_name} for volume {volume_name}') self.recurringjob.create(job_name, task="backup") self.recurringjob.add_to_volume(job_name, volume_name) - self.recurringjob.get(job_name) - logging(f'Created recurringjob {job_name} for volume {volume_name}') def check_recurringjobs_work(self, volume_name): + logging(f'Checking recurringjobs work for volume {volume_name}') self.recurringjob.check_jobs_work(volume_name) - - def cleanup_recurringjobs(self, volume_names): - self.recurringjob.cleanup(volume_names) diff --git a/e2e/libs/keywords/statefulset_keywords.py b/e2e/libs/keywords/statefulset_keywords.py index 1cdca2c4a6..4ffba42270 100644 --- a/e2e/libs/keywords/statefulset_keywords.py +++ b/e2e/libs/keywords/statefulset_keywords.py @@ -1,33 +1,66 @@ -from workload.persistentvolumeclaim import delete_persistentvolumeclaim +from robot.libraries.BuiltIn import BuiltIn + +from persistentvolumeclaim import PersistentVolumeClaim + +from utility.constant import LABEL_TEST +from utility.constant import LABEL_TEST_VALUE +from utility.utility import logging + +from volume import Volume + from workload.statefulset import create_statefulset from workload.statefulset import delete_statefulset from workload.statefulset import get_statefulset +from workload.statefulset import list_statefulsets from workload.statefulset import scale_statefulset from workload.statefulset import wait_for_statefulset_replicas_ready -from workload.workload import get_workload_pvc_name class statefulset_keywords: def __init__(self): - pass + self.persistentvolumeclaim = PersistentVolumeClaim() + self.volume = Volume() + + def cleanup_statefulsets(self): + statefulsets = list_statefulsets(label_selector=f"{LABEL_TEST}={LABEL_TEST_VALUE}") + + logging(f'Cleaning up {len(statefulsets.items)} statefulsets') + for statefulset in statefulsets.items: + self.delete_statefulset(statefulset.metadata.name) - def cleanup_statefulsets(self, statefulset_names): - for name in statefulset_names: - pvc_name = get_workload_pvc_name(name) - delete_statefulset(name) - delete_persistentvolumeclaim(pvc_name) + def create_statefulset(self, name, volume_type="RWO", option=""): + logging(f'Creating statefulset {name}') + create_statefulset(name, volume_type, option) - def create_statefulset(self, volume_type="rwo", option=""): - statefulset_name = create_statefulset(volume_type, option) - return statefulset_name + def delete_statefulset(self, name): + logging(f'Deleting statefulset {name}') + delete_statefulset(name) def get_statefulset(self, statefulset_name): return get_statefulset(statefulset_name) def scale_statefulset(self, statefulset_name, replica_count): + logging(f'Scaling statefulset {statefulset_name} to {replica_count}') return scale_statefulset(statefulset_name, replica_count) + def scale_statefulset_down(self, statefulset_name): + logging(f'Scaling statefulset {statefulset_name} down') + scale_statefulset(statefulset_name, 0) + + workload_keywords = BuiltIn().get_library_instance('workload_keywords') + workload_keywords.wait_for_workload_volume_detached(statefulset_name) + + def scale_statefulset_up(self, statefulset_name, replicaset_count=3): + logging(f'Scaling statefulset {statefulset_name} up to {replicaset_count}') + scale_statefulset(statefulset_name, replicaset_count) + + workload_keywords = BuiltIn().get_library_instance('workload_keywords') + workload_keywords.wait_for_workload_volume_healthy(statefulset_name) + + self.wait_for_statefulset_replicas_ready(statefulset_name, replicaset_count) + def wait_for_statefulset_replicas_ready(self, statefulset_name, expected_ready_count): - return wait_for_statefulset_replicas_ready(statefulset_name, expected_ready_count) + logging(f'Waiting for statefulset {statefulset_name} to have {expected_ready_count} replicas ready') + wait_for_statefulset_replicas_ready(statefulset_name, expected_ready_count) diff --git a/e2e/libs/keywords/stress_keywords.py b/e2e/libs/keywords/stress_keywords.py index f9b9928d44..afb3826716 100644 --- a/e2e/libs/keywords/stress_keywords.py +++ b/e2e/libs/keywords/stress_keywords.py @@ -1,26 +1,56 @@ -from robot.libraries.BuiltIn import BuiltIn - +from node import Node from node import Stress -from node.utility import list_node_names_by_role -from node.utility import list_node_names_by_volumes + +from volume import Volume + +from utility.constant import LABEL_TEST +from utility.constant import LABEL_TEST_VALUE +from utility.utility import logging class stress_keywords: def __init__(self): + self.node = Node() self.stress = Stress() + self.volume = Volume() def cleanup_stress_helper(self): + logging(f'Cleaning up stress helper') self.stress.cleanup() def stress_node_cpu_by_role(self, role): - self.stress.cpu(list_node_names_by_role(role)) + logging(f'Stressing node CPU for {role} nodes') + self.stress.cpu(self.node.list_node_names_by_role(role)) + + def stress_node_cpu_by_volume(self, volume_name): + logging(f'Stressing node CPU for volume {volume_name}') + self.stress_node_cpu_by_volumes([volume_name]) def stress_node_cpu_by_volumes(self, volume_names): - self.stress.cpu(list_node_names_by_volumes(volume_names)) + logging(f'Stressing node CPU for volumes {volume_names}') + self.stress.cpu(self.node.list_node_names_by_volumes(volume_names)) + + def stress_node_cpu_of_all_volumes(self): + volume_names = self.volume.list_names(label_selector=f"{LABEL_TEST}={LABEL_TEST_VALUE}") + + logging(f'Stressing node CPU for all volumes {volume_names}') + self.stress_node_cpu_by_volumes(volume_names) def stress_node_memory_by_role(self, role): - self.stress.memory(list_node_names_by_role(role)) + logging(f'Stressing node memory for {role} nodes') + self.stress.memory(self.node.list_node_names_by_role(role)) + + def stress_node_memory_by_volume(self, volume_name): + logging(f'Stressing node memory for volume {volume_name}') + self.stress_node_memory_by_volumes([volume_name]) def stress_node_memory_by_volumes(self, volume_names): - self.stress.memory(list_node_names_by_volumes(volume_names)) + logging(f'Stressing node memory for volumes {volume_names}') + self.stress.memory(self.node.list_node_names_by_volumes(volume_names)) + + def stress_node_memory_of_all_volumes(self): + volume_names = self.volume.list_names(label_selector=f"{LABEL_TEST}={LABEL_TEST_VALUE}") + + logging(f'Stressing node memory for all volumes {volume_names}') + self.stress_node_memory_by_volumes(volume_names) diff --git a/e2e/libs/keywords/volume_keywords.py b/e2e/libs/keywords/volume_keywords.py index 7a5965157d..fd3eb3e67d 100644 --- a/e2e/libs/keywords/volume_keywords.py +++ b/e2e/libs/keywords/volume_keywords.py @@ -1,9 +1,9 @@ -from node.utility import get_node_by_index -from node.utility import get_test_pod_running_node -from node.utility import get_test_pod_not_running_node -from node.utility import list_node_names_by_role +from node import Node +from node.utility import check_replica_locality -from utility.utility import generate_volume_name +from utility.constant import ANNOT_CHECKSUM +from utility.constant import LABEL_TEST +from utility.constant import LABEL_TEST_VALUE from utility.utility import logging from volume import Volume @@ -12,16 +12,26 @@ class volume_keywords: def __init__(self): + self.node = Node() self.volume = Volume() - def create_volume(self, size, replica_count): - volume_name = generate_volume_name() + def cleanup_volumes(self): + volumes = self.volume.list(label_selector=f"{LABEL_TEST}={LABEL_TEST_VALUE}") + + logging(f'Cleaning up {len(volumes["items"])} volumes') + for volume in volumes['items']: + self.delete_volume(volume['metadata']['name']) + + def create_volume(self, volume_name, size, replica_count): logging(f'Creating volume {volume_name}') self.volume.create(volume_name, size, replica_count) - return volume_name + + def delete_volume(self, volume_name): + logging(f'Deleting volume {volume_name}') + self.volume.delete(volume_name) def attach_volume(self, volume_name): - attach_node = get_test_pod_not_running_node() + attach_node = self.node.get_test_pod_not_running_node() logging(f'Attaching volume {volume_name} to {attach_node}') self.volume.attach(volume_name, attach_node) @@ -33,60 +43,123 @@ def wait_for_volume_expand_to_size(self, volume_name, size): logging(f'Waiting for volume {volume_name} expand to {size}') return self.volume.wait_for_volume_expand_to_size(volume_name, size) - def get_replica_node_attached_to_volume(self, volume_name): - volume = self.volume.get(volume_name) - return volume['spec']['nodeID'] + def get_replica_node_ids(self, volume_name): + node_ids = [] + node_ids.extend(self.get_node_ids_by_replica_locality(volume_name, "volume node")) + node_ids.extend(self.get_node_ids_by_replica_locality(volume_name, "replica node")) + node_ids.extend(self.get_node_ids_by_replica_locality(volume_name, "test pod node")) + return node_ids + + def get_node_id_by_replica_locality(self, volume_name, replica_locality): + return self.get_node_ids_by_replica_locality(volume_name, replica_locality)[0] + + def get_node_ids_by_replica_locality(self, volume_name, replica_locality): + check_replica_locality(replica_locality) + + if replica_locality == "volume node": + volume = self.volume.get(volume_name) + return [volume['spec']['nodeID']] + + worker_nodes = self.node.list_node_names_by_role("worker") + volume_node = self.get_node_ids_by_replica_locality(volume_name, "volume node") + replica_nodes = [node for node in worker_nodes if node != volume_node] + test_pod_node = self.node.get_test_pod_running_node() - def get_replica_node_not_attached_to_volume(self, volume_name): - worker_nodes = list_node_names_by_role("worker") - volume_node = self.get_replica_node_attached_to_volume(volume_name) - test_pod_running_node = get_test_pod_running_node() - for worker_node in worker_nodes: - if worker_node != volume_node and worker_node != test_pod_running_node: - return worker_node + if replica_locality == "test pod node": + if test_pod_node in replica_nodes: + return [test_pod_node] + + elif replica_locality == "replica node": + return [node for node in replica_nodes if node != test_pod_node] + + else: + raise ValueError(f"Unknown replica locality {replica_locality}") + + raise Exception(f"Failed to get node ID of the replica on {replica_locality}") def write_volume_random_data(self, volume_name, size_in_mb): - return self.volume.write_random_data(volume_name, size_in_mb) + logging(f'Writing {size_in_mb} MB random data to volume {volume_name}') + checksum = self.volume.write_random_data(volume_name, size_in_mb) + + self.volume.set_annotation(volume_name, ANNOT_CHECKSUM, checksum) def keep_writing_data(self, volume_name): + logging(f'Keep writing data to volume {volume_name}') self.volume.keep_writing_data(volume_name) - def check_data_checksum(self, volume_name, checksum): - logging(f"Checking volume {volume_name} data with checksum {checksum}") + def check_data_checksum(self, volume_name): + checksum = self.volume.get_annotation_value(volume_name, ANNOT_CHECKSUM) + + logging(f"Checking volume {volume_name} data checksum is {checksum}") self.volume.check_data_checksum(volume_name, checksum) def delete_replica(self, volume_name, replica_node): if str(replica_node).isdigit(): - replica_node = get_node_by_index(replica_node) + replica_node = self.node.get_node_by_index(replica_node) + logging(f"Deleting volume {volume_name}'s replica on node {replica_node}") self.volume.delete_replica(volume_name, replica_node) + def delete_replica_on_node(self, volume_name, replica_locality): + check_replica_locality(replica_locality) + + node_id = self.get_node_id_by_replica_locality(volume_name, replica_locality) + + logging(f"Deleting volume {volume_name}'s replica on node {node_id}") + self.volume.delete_replica(volume_name, node_id) + + def set_annotation(self, volume_name, annotation_key, annotation_value): + self.volume.set_annotation(volume_name, annotation_key, annotation_value) + def wait_for_replica_rebuilding_start(self, volume_name, replica_node): if str(replica_node).isdigit(): - replica_node = get_node_by_index(replica_node) + replica_node = self.node.get_node_by_index(replica_node) + logging(f"Waiting for volume {volume_name}'s replica on node {replica_node} rebuilding started") self.volume.wait_for_replica_rebuilding_start( volume_name, replica_node ) + def wait_for_replica_rebuilding_to_start_on_node(self, volume_name, replica_locality): + check_replica_locality(replica_locality) + + node_id = self.get_node_id_by_replica_locality(volume_name, replica_locality) + + logging(f"Waiting for volume {volume_name}'s replica on node {node_id} rebuilding started") + self.volume.wait_for_replica_rebuilding_start(volume_name, node_id) + def wait_for_replica_rebuilding_complete(self, volume_name, replica_node): if str(replica_node).isdigit(): - replica_node = get_node_by_index(replica_node) + replica_node = self.node.get_node_by_index(replica_node) + logging(f"Waiting for volume {volume_name}'s replica on node {replica_node} rebuilding completed") self.volume.wait_for_replica_rebuilding_complete( volume_name, replica_node ) + def wait_for_replica_rebuilding_to_complete_on_node(self, volume_name, replica_locality): + check_replica_locality(replica_locality) + + node_id = self.get_node_id_by_replica_locality(volume_name, replica_locality) + + logging(f"Waiting for volume {volume_name}'s replica on node {node_id} rebuilding completed") + self.volume.wait_for_replica_rebuilding_complete(volume_name, node_id) + + def wait_for_replica_rebuilding_to_complete(self, volume_name): + for node_id in self.get_replica_node_ids(volume_name): + logging(f"Waiting for volume {volume_name}'s replica on node {node_id} rebuilding completed") + self.volume.wait_for_replica_rebuilding_complete(volume_name, node_id) + def wait_for_volume_attached(self, volume_name): + logging(f'Waiting for volume {volume_name} to be attached') self.volume.wait_for_volume_attached(volume_name) def wait_for_volume_detached(self, volume_name): + logging(f'Waiting for volume {volume_name} to be detached') self.volume.wait_for_volume_detached(volume_name) def wait_for_volume_healthy(self, volume_name): + logging(f'Waiting for volume {volume_name} to be healthy') self.volume.wait_for_volume_healthy(volume_name) - - def cleanup_volumes(self, volume_names): - self.volume.cleanup(volume_names) diff --git a/e2e/libs/keywords/workload_keywords.py b/e2e/libs/keywords/workload_keywords.py index fe344d872a..6316691880 100644 --- a/e2e/libs/keywords/workload_keywords.py +++ b/e2e/libs/keywords/workload_keywords.py @@ -1,22 +1,44 @@ -from workload.deployment import create_deployment -from workload.deployment import delete_deployment -from workload.persistentvolumeclaim import create_persistentvolumeclaim -from workload.persistentvolumeclaim import delete_persistentvolumeclaim +import multiprocessing + +from deployment_keywords import deployment_keywords +from kubelet_keywords import kubelet_keywords +from host_keywords import host_keywords +from statefulset_keywords import statefulset_keywords +from volume_keywords import volume_keywords + +from persistentvolumeclaim import PersistentVolumeClaim + +from workload.pod import get_volume_name_by_pod from workload.workload import check_pod_data_checksum from workload.workload import create_storageclass from workload.workload import delete_storageclass from workload.workload import get_workload_pod_names -from workload.workload import get_workload_pvc_name +from workload.workload import get_workload_persistent_volume_claim_name from workload.workload import get_workload_volume_name from workload.workload import keep_writing_pod_data from workload.workload import write_pod_random_data -from workload.workload import wait_for_workload_pod_stable +from workload.workload import wait_for_workload_pods_running +from workload.workload import wait_for_workload_pods_stable + +from utility.constant import ANNOT_CHECKSUM +from utility.constant import ANNOT_EXPANDED_SIZE +from utility.utility import logging + +from volume import Volume +from volume.constant import MEBIBYTE class workload_keywords: def __init__(self): - pass + self.deployment_keywords = deployment_keywords() + self.kubelet_keywords = kubelet_keywords() + self.host_keywords = host_keywords() + self.statefulset_keywords = statefulset_keywords() + self.volume_keywords = volume_keywords() + + self.persistentvolumeclaim = PersistentVolumeClaim() + self.volume = Volume() def init_storageclasses(self): create_storageclass('longhorn-test') @@ -26,34 +48,90 @@ def cleanup_storageclasses(self): delete_storageclass('longhorn-test') delete_storageclass('longhorn-test-strict-local') - def create_deployment(self, volume_type="rwo", option=""): - create_persistentvolumeclaim(volume_type, option) - deployment_name = create_deployment(volume_type, option) - return deployment_name + def check_pod_data_checksum(self, expected_checksum, pod_name, file_name): + logging(f'Checking checksum for file {file_name} in pod {pod_name}') + check_pod_data_checksum(expected_checksum, pod_name, file_name) def get_workload_pod_name(self, workload_name): return get_workload_pod_names(workload_name)[0] - def get_workload_pvc_name(self, workload_name): - return get_workload_pvc_name(workload_name) + def get_workload_persistent_volume_claim_name(self, workload_name): + return get_workload_persistent_volume_claim_name(workload_name) def get_workload_volume_name(self, workload_name): return get_workload_volume_name(workload_name) - def keep_writing_pod_data(self, pod_name): - return keep_writing_pod_data(pod_name) + def write_workload_pod_random_data(self, workload_name, size_in_mb, file_name): + pod_name = get_workload_pod_names(workload_name)[0] + + logging(f'Writing {size_in_mb} MB random data to pod {pod_name}') + checksum = write_pod_random_data(pod_name, size_in_mb, file_name) + + volume = get_volume_name_by_pod(pod_name) + self.volume_keywords.set_annotation(volume, ANNOT_CHECKSUM, checksum) + + def check_workload_pod_data_checksum(self, workload_name, file_name): + pod_name = get_workload_pod_names(workload_name)[0] + volume = get_volume_name_by_pod(pod_name) + expected_checksum = self.volume.get_annotation_value(volume, ANNOT_CHECKSUM) + + logging(f'Checking checksum for file {file_name} in pod {pod_name}') + check_pod_data_checksum(expected_checksum, pod_name, file_name) + + def keep_writing_workload_pod_data(self, workload_name): + pod_name = get_workload_pod_names(workload_name)[0] + + logging(f'Keep writing data to pod {pod_name}') + keep_writing_pod_data(pod_name) + + def reboot_workload_volume_node(self, workload_name, downtime_in_min=1): + volume_name = get_workload_volume_name(workload_name) + node_id = self.volume_keywords.get_node_id_by_replica_locality(volume_name, "volume node") + self.host_keywords.reboot_node_by_name(node_id, downtime_in_min=downtime_in_min) + + def restart_workload_kubelet(self, workload_name, downtime_in_sec): + volume_name = get_workload_volume_name(workload_name) + node_id = self.volume_keywords.get_node_id_by_replica_locality(volume_name, "volume node") + self.kubelet_keywords.restart_kubelet(node_id, downtime_in_sec) + + def wait_for_workload_pods_running(self, workload_name, namespace="default"): + logging(f'Waiting for {namespace} workload {workload_name} pods running') + wait_for_workload_pods_running(workload_name, namespace=namespace) + + def wait_for_workloads_pods_running(self, workload_names, namespace="default"): + logging(f'Waiting for {namespace} workloads {workload_names} pods running') + with multiprocessing.Pool(processes=len(workload_names)) as pool: + pool.starmap(wait_for_workload_pods_running, [(name, namespace) for name in workload_names]) + + pool.join() + + def wait_for_workload_pods_stable(self, workload_name, namespace="default"): + logging(f'Waiting for {namespace} workload {workload_name} pod stable') + wait_for_workload_pods_stable(workload_name, namespace=namespace) + + def wait_for_workload_volume_healthy(self, workload_name): + volume_name = get_workload_volume_name(workload_name) + + logging(f'Waiting for workload {workload_name} volume {volume_name} to be healthy') + self.volume_keywords.wait_for_volume_healthy(volume_name) + + def wait_for_workload_volume_detached(self, workload_name): + volume_name = get_workload_volume_name(workload_name) + + logging(f'Waiting for {workload_name} volume {volume_name} to be detached') + self.volume.wait_for_volume_detached(volume_name) - def write_pod_random_data(self, pod, size_in_mb): - return write_pod_random_data(pod, size_in_mb) + def expand_workload_claim_size_by_mib(self, workload_name, size_in_mib, claim_index=0): + claim_name = get_workload_persistent_volume_claim_name(workload_name, index=claim_index) + size_in_byte = int(size_in_mib) * MEBIBYTE - def check_pod_data_checksum(self, pod_name, checksum): - check_pod_data_checksum(pod_name, checksum) + logging(f'Expanding {workload_name} persistentvolumeclaim {claim_name} by {size_in_mib} MiB') + self.persistentvolumeclaim.expand(claim_name, size_in_byte) - def cleanup_deployments(self, deployment_names): - for name in deployment_names: - pvc_name = get_workload_pvc_name(name) - delete_deployment(name) - delete_persistentvolumeclaim(pvc_name) + def wait_for_workload_claim_size_expanded(self, workload_name, claim_index=0): + claim_name = get_workload_persistent_volume_claim_name(workload_name, index=claim_index) + expanded_size = self.persistentvolumeclaim.get_annotation_value(claim_name, ANNOT_EXPANDED_SIZE) + volume_name = get_workload_volume_name(workload_name) - def wait_for_workload_pod_stable(self, workload_name): - return wait_for_workload_pod_stable(workload_name) + logging(f'Waiting for {workload_name} volume {volume_name} to expand to {expanded_size}') + self.volume.wait_for_volume_expand_to_size(volume_name, expanded_size) diff --git a/e2e/libs/kubelet/kubelet.py b/e2e/libs/kubelet/kubelet.py index 06beb039da..b1ff67334f 100644 --- a/e2e/libs/kubelet/kubelet.py +++ b/e2e/libs/kubelet/kubelet.py @@ -1,22 +1,21 @@ import time +from workload.constant import IMAGE_UBUNTU from workload.pod import create_pod from workload.pod import delete_pod from workload.pod import new_pod_manifest -from workload.constant import IMAGE_UBUNTU - -def restart_kubelet(node_name, stop_time_in_sec=10): +def restart_kubelet(node_name, downtime_in_sec=10): manifest = new_pod_manifest( image=IMAGE_UBUNTU, command=["/bin/bash"], - args=["-c", f"sleep 10 && systemctl stop k3s-agent && sleep {stop_time_in_sec} && systemctl start k3s-agent"], + args=["-c", f"sleep 10 && systemctl stop k3s-agent && sleep {downtime_in_sec} && systemctl start k3s-agent"], node_name=node_name ) pod_name = manifest['metadata']['name'] create_pod(manifest, is_wait_for_pod_running=True) - time.sleep(stop_time_in_sec) + time.sleep(downtime_in_sec) delete_pod(pod_name) diff --git a/e2e/libs/network/network.py b/e2e/libs/network/network.py index b775430f52..c06f29a028 100644 --- a/e2e/libs/network/network.py +++ b/e2e/libs/network/network.py @@ -1,11 +1,9 @@ from robot.libraries.BuiltIn import BuiltIn -from node.utility import list_node_names_by_role - +from node import Node from node_exec import NodeExec - def get_control_plane_node_network_latency_in_ms(): latency_in_ms = int(BuiltIn().get_variable_value("${CONTROL_PLANE_NODE_NETWORK_LATENCY_IN_MS}", default="0")) return latency_in_ms @@ -14,7 +12,7 @@ def get_control_plane_node_network_latency_in_ms(): def setup_control_plane_network_latency(): latency_in_ms = get_control_plane_node_network_latency_in_ms() if latency_in_ms != 0: - control_plane_nodes = list_node_names_by_role("control-plane") + control_plane_nodes = Node.list_node_names_by_role("control-plane") for control_plane_node in control_plane_nodes: cmd = f"tc qdisc replace dev eth0 root netem delay {latency_in_ms}ms" res = NodeExec.get_instance().issue_cmd(control_plane_node, cmd) @@ -26,7 +24,7 @@ def setup_control_plane_network_latency(): def cleanup_control_plane_network_latency(): latency_in_ms = get_control_plane_node_network_latency_in_ms() if latency_in_ms != 0: - control_plane_nodes = list_node_names_by_role("control-plane") + control_plane_nodes = Node.list_node_names_by_role("control-plane") for control_plane_node in control_plane_nodes: cmd = "tc qdisc del dev eth0 root" res = NodeExec.get_instance().issue_cmd(control_plane_node, cmd) diff --git a/e2e/libs/node/node.py b/e2e/libs/node/node.py index 4938a473ad..cdc6edd543 100644 --- a/e2e/libs/node/node.py +++ b/e2e/libs/node/node.py @@ -1,73 +1,17 @@ -import boto3 +import os import time -import yaml from kubernetes import client -from node.utility import list_node_names_by_role +from robot.libraries.BuiltIn import BuiltIn -from utility.utility import logging -from utility.utility import wait_for_cluster_ready +from utility.utility import get_retry_count_and_interval class Node: def __init__(self): - with open('/tmp/instance_mapping', 'r') as f: - self.mapping = yaml.safe_load(f) - self.aws_client = boto3.client('ec2') - - def reboot_all_nodes(self, shut_down_time_in_sec=60): - instance_ids = [value for value in self.mapping.values()] - - resp = self.aws_client.stop_instances(InstanceIds=instance_ids, Force=True) - logging(f"Stopping instances {instance_ids} response: {resp}") - waiter = self.aws_client.get_waiter('instance_stopped') - waiter.wait(InstanceIds=instance_ids) - logging(f"Stopped instances") - - time.sleep(shut_down_time_in_sec) - - resp = self.aws_client.start_instances(InstanceIds=instance_ids) - logging(f"Starting instances {instance_ids} response: {resp}") - waiter = self.aws_client.get_waiter('instance_running') - waiter.wait(InstanceIds=instance_ids) - wait_for_cluster_ready() - logging(f"Started instances") - - def reboot_node(self, reboot_node_name, shut_down_time_in_sec=60): - instance_ids = [self.mapping[reboot_node_name]] - - resp = self.aws_client.stop_instances(InstanceIds=instance_ids, Force=True) - logging(f"Stopping instances {instance_ids} response: {resp}") - waiter = self.aws_client.get_waiter('instance_stopped') - waiter.wait(InstanceIds=instance_ids) - logging(f"Stopped instances") - - time.sleep(shut_down_time_in_sec) - - resp = self.aws_client.start_instances(InstanceIds=instance_ids) - logging(f"Starting instances {instance_ids} response: {resp}") - waiter = self.aws_client.get_waiter('instance_running') - waiter.wait(InstanceIds=instance_ids) - logging(f"Started instances") - - def reboot_all_worker_nodes(self, shut_down_time_in_sec=60): - instance_ids = [self.mapping[value] for value in list_node_names_by_role("worker")] - - resp = self.aws_client.stop_instances(InstanceIds=instance_ids, Force=True) - logging(f"Stopping instances {instance_ids} response: {resp}") - waiter = self.aws_client.get_waiter('instance_stopped') - waiter.wait(InstanceIds=instance_ids) - logging(f"Stopped instances") - - time.sleep(shut_down_time_in_sec) - - resp = self.aws_client.start_instances(InstanceIds=instance_ids) - logging(f"Starting instances {instance_ids} response: {resp}") - waiter = self.aws_client.get_waiter('instance_running') - waiter.wait(InstanceIds=instance_ids) - logging(f"Started instances") + pass def get_all_pods_on_node(self, node_name): api = client.CoreV1Api() @@ -76,7 +20,8 @@ def get_all_pods_on_node(self, node_name): return user_pods def wait_all_pods_evicted(self, node_name): - for i in range(RETRY_COUNT): + retry_count, retry_interval = get_retry_count_and_interval() + for _ in range(retry_count): pods = self.get_all_pods_on_node(node_name) evicted = True for pod in pods: @@ -91,6 +36,63 @@ def wait_all_pods_evicted(self, node_name): if evicted: break - time.sleep(RETRY_INTERVAL) + time.sleep(retry_interval) assert evicted, 'failed to evict pods' + + def get_node_by_index(self, index, role="worker"): + nodes = self.list_node_names_by_role(role) + return nodes[int(index)] + + def get_node_by_name(self, node_name): + core_api = client.CoreV1Api() + return core_api.read_node(node_name) + + def get_test_pod_running_node(self): + if "NODE_NAME" in os.environ: + return os.environ["NODE_NAME"] + else: + return self.get_node_by_index(0) + + def get_test_pod_not_running_node(self): + worker_nodes = self.list_node_names_by_role("worker") + test_pod_running_node = self.get_test_pod_running_node() + for worker_node in worker_nodes: + if worker_node != test_pod_running_node: + return worker_node + + def get_node_cpu_cores(self, node_name): + node = self.get_node_by_name(node_name) + return node.status.capacity['cpu'] + + def list_node_names_by_volumes(self, volume_names): + volume_keywords = BuiltIn().get_library_instance('volume_keywords') + volume_nodes = {} + for volume_name in volume_names: + volume_node = volume_keywords.get_node_id_by_replica_locality(volume_name, "volume node") + if volume_node not in volume_nodes: + volume_nodes[volume_node] = True + return list(volume_nodes.keys()) + + def list_node_names_by_role(self, role="all"): + if role not in ["all", "control-plane", "worker"]: + raise ValueError("Role must be one of 'all', 'master' or 'worker'") + + def filter_nodes(nodes, condition): + return [node.metadata.name for node in nodes if condition(node)] + + core_api = client.CoreV1Api() + nodes = core_api.list_node().items + + control_plane_labels = ['node-role.kubernetes.io/master', 'node-role.kubernetes.io/control-plane'] + + if role == "all": + return sorted(filter_nodes(nodes, lambda node: True)) + + if role == "control-plane": + condition = lambda node: all(label in node.metadata.labels for label in control_plane_labels) + return sorted(filter_nodes(nodes, condition)) + + if role == "worker": + condition = lambda node: not any(label in node.metadata.labels for label in control_plane_labels) + return sorted(filter_nodes(nodes, condition)) diff --git a/e2e/libs/node/stress.py b/e2e/libs/node/stress.py index b293103a5c..2aacb1041d 100644 --- a/e2e/libs/node/stress.py +++ b/e2e/libs/node/stress.py @@ -1,7 +1,6 @@ from kubernetes.client.rest import ApiException -from node.utility import get_node_cpu_cores - +from node import Node from node.constant import NODE_STRESS_CPU_LOAD_PERCENTAGE from node.constant import NODE_STRESS_MEM_LOAD_PERCENTAGE from node.constant import NODE_STRESS_MEM_VM_WORKERS @@ -11,19 +10,22 @@ from utility.utility import logging +from workload.constant import IMAGE_LITMUX from workload.pod import create_pod from workload.pod import delete_pod from workload.pod import get_pod from workload.pod import new_pod_manifest from workload.workload import get_workload_pods -from workload.constant import IMAGE_LITMUX - class Stress: + + def __init__(self) -> None: + self.node = Node() + def cleanup(self): for pod in get_workload_pods(STRESS_HELPER_LABEL): - logging(f"Cleaning up stress pod {pod.metadata.name}") + logging(f"Deleting stress pod {pod.metadata.name}") delete_pod(pod.metadata.name, pod.metadata.namespace) def cpu(self, node_names): @@ -47,7 +49,7 @@ def cpu(self, node_names): pod_name=pod_name, image=IMAGE_LITMUX, command=["stress-ng"], - args=['--cpu', str(get_node_cpu_cores(node_name)), + args=['--cpu', str(self.node.get_node_cpu_cores(node_name)), '--cpu-load', str(NODE_STRESS_CPU_LOAD_PERCENTAGE), '--timeout', str(NODE_STRESS_TIMEOUT_SECOND)], node_name=node_name, diff --git a/e2e/libs/node/utility.py b/e2e/libs/node/utility.py index 8f5c957b5e..b8230d2fa3 100644 --- a/e2e/libs/node/utility.py +++ b/e2e/libs/node/utility.py @@ -1,70 +1,2 @@ -import os - -from robot.libraries.BuiltIn import BuiltIn - -from kubernetes import client - - -def get_test_pod_running_node(): - if "NODE_NAME" in os.environ: - return os.environ["NODE_NAME"] - else: - return get_node_by_index(0) - - -def get_test_pod_not_running_node(): - worker_nodes = list_node_names_by_role("worker") - test_pod_running_node = get_test_pod_running_node() - for worker_node in worker_nodes: - if worker_node != test_pod_running_node: - return worker_node - - -def get_node_by_index(index, role="worker"): - nodes = list_node_names_by_role(role) - return nodes[int(index)] - - -def get_node_by_name(node_name): - core_api = client.CoreV1Api() - return core_api.read_node(node_name) - - -def get_node_cpu_cores(node_name): - node = get_node_by_name(node_name) - return node.status.capacity['cpu'] - - -def list_node_names_by_role(role="all"): - if role not in ["all", "control-plane", "worker"]: - raise ValueError("Role must be one of 'all', 'master' or 'worker'") - - def filter_nodes(nodes, condition): - return [node.metadata.name for node in nodes if condition(node)] - - core_api = client.CoreV1Api() - nodes = core_api.list_node().items - - control_plane_labels = ['node-role.kubernetes.io/master', 'node-role.kubernetes.io/control-plane'] - - if role == "all": - return sorted(filter_nodes(nodes, lambda node: True)) - - if role == "control-plane": - condition = lambda node: all(label in node.metadata.labels for label in control_plane_labels) - return sorted(filter_nodes(nodes, condition)) - - if role == "worker": - condition = lambda node: not any(label in node.metadata.labels for label in control_plane_labels) - return sorted(filter_nodes(nodes, condition)) - - -def list_node_names_by_volumes(volume_names): - volume_nodes = {} - volume_keywords = BuiltIn().get_library_instance('volume_keywords') - - for volume_name in volume_names: - volume_node = volume_keywords.get_replica_node_attached_to_volume(volume_name) - if volume_node not in volume_nodes: - volume_nodes[volume_node] = True - return list(volume_nodes.keys()) +def check_replica_locality(replica_locality): + assert replica_locality in ["replica node", "test pod node", "volume node"] diff --git a/e2e/libs/node_exec/node_exec.py b/e2e/libs/node_exec/node_exec.py index dd6aee3c1d..b8e5bb2543 100644 --- a/e2e/libs/node_exec/node_exec.py +++ b/e2e/libs/node_exec/node_exec.py @@ -3,12 +3,13 @@ from kubernetes import client from kubernetes.stream import stream +from node_exec.constant import DEFAULT_POD_INTERVAL +from node_exec.constant import DEFAULT_POD_TIMEOUT + from utility.utility import logging -from workload.pod import wait_delete_pod from utility.utility import wait_delete_ns -from node_exec.constant import DEFAULT_POD_INTERVAL -from node_exec.constant import DEFAULT_POD_TIMEOUT +from workload.pod import wait_delete_pod class NodeExec: diff --git a/e2e/libs/persistentvolumeclaim/crd.py b/e2e/libs/persistentvolumeclaim/crd.py index 4f6ef8999d..1556835d97 100644 --- a/e2e/libs/persistentvolumeclaim/crd.py +++ b/e2e/libs/persistentvolumeclaim/crd.py @@ -16,19 +16,50 @@ def get(self, claim_name, claim_namespace="default"): namespace=claim_namespace, ) + def list(self, claim_namespace="default", label_selector=None): + return self.core_v1_api.list_namespaced_persistent_volume_claim( + namespace=claim_namespace, + label_selector=label_selector + ) + + def set_annotation(self, claim_name, annotation_key, annotation_value, claim_namespace="default"): + for _ in range(self.retry_count): + claim = self.get(claim_name, claim_namespace) + + annotations = claim.metadata.annotations + if annotations is None: + annotations = {} + + if annotation_key in annotations and annotations[annotation_key] == annotation_value: + return + + annotations[annotation_key] = annotation_value + claim.metadata.annotations = annotations + self.core_v1_api.patch_namespaced_persistent_volume_claim( + name=claim_name, + namespace=claim_namespace, + body=claim + ) + + assert False, f"Failed to set annotation {annotation_key} to {annotation_value} for PVC {claim_name}" + + def get_annotation_value(self, claim_name, annotation_key, claim_namespace="default"): + claim = self.get(claim_name, claim_namespace) + return claim.metadata.annotations[annotation_key] + def expand(self, claim_name, size, namespace="default"): try: self.core_v1_api.patch_namespaced_persistent_volume_claim( name=claim_name, namespace=namespace, body={ - 'spec': { - 'resources': { - 'requests': { - 'storage': str(size) - } + 'spec': { + 'resources': { + 'requests': { + 'storage': str(size) } } + } } ) return size diff --git a/e2e/libs/persistentvolumeclaim/persistentvolumeclaim.py b/e2e/libs/persistentvolumeclaim/persistentvolumeclaim.py index bd9e272af1..2e0f972969 100644 --- a/e2e/libs/persistentvolumeclaim/persistentvolumeclaim.py +++ b/e2e/libs/persistentvolumeclaim/persistentvolumeclaim.py @@ -1,7 +1,17 @@ +import time +import yaml + from strategy import LonghornOperationStrategy from persistentvolumeclaim.crd import CRD +from kubernetes import client +from kubernetes.client.rest import ApiException + +from utility.constant import ANNOT_EXPANDED_SIZE +from utility.constant import LABEL_TEST +from utility.constant import LABEL_TEST_VALUE +from utility.utility import get_retry_count_and_interval from utility.utility import logging @@ -11,15 +21,74 @@ class PersistentVolumeClaim(): def __init__(self): if self._strategy == LonghornOperationStrategy.CRD: - self.pvc = CRD() + self.claim = CRD() + + def create(self, name, volume_type="RWO", option=""): + filepath = "./templates/workload/pvc.yaml" + with open(filepath, 'r') as f: + namespace = 'default' + manifest_dict = yaml.safe_load(f) + + # correct pvc name + manifest_dict['metadata']['name'] = name + + # add label + manifest_dict['metadata']['labels'][LABEL_TEST] = LABEL_TEST_VALUE + + # correct storageclass name + if option: + manifest_dict['spec']['storageClassName'] += f"-{option}" + + # correct access mode` + if volume_type == 'RWX': + manifest_dict['spec']['accessModes'][0] = 'ReadWriteMany' + api = client.CoreV1Api() + + api.create_namespaced_persistent_volume_claim( + body=manifest_dict, + namespace=namespace) + + def delete(self, name, namespace='default'): + api = client.CoreV1Api() + try: + api.delete_namespaced_persistent_volume_claim( + name=name, + namespace=namespace, + grace_period_seconds=0) + except ApiException as e: + assert e.status == 404 + + retry_count, retry_interval = get_retry_count_and_interval() + for _ in range(retry_count): + resp = api.list_namespaced_persistent_volume_claim(namespace=namespace) + deleted = True + for item in resp.items: + if item.metadata.name == name: + deleted = False + break + if deleted: + break + time.sleep(retry_interval) + assert deleted def get(self, claim_name): - return self.pvc.get(claim_name) + return self.claim.get(claim_name) + + def list(self, claim_namespace="default", label_selector=None): + return self.claim.list(claim_namespace=claim_namespace, + label_selector=label_selector) + + def set_annotation(self, claim_name, annotation_key, annotation_value, claim_namespace="default"): + return self.claim.set_annotation(claim_name, annotation_key, annotation_value, claim_namespace=claim_namespace) + + def get_annotation_value(self, claim_name, annotation_key): + return self.claim.get_annotation_value(claim_name, annotation_key) def expand(self, claim_name, size_in_byte): - pvc = self.pvc.get(claim_name) + pvc = self.claim.get(claim_name) current_size = int(pvc.spec.resources.requests['storage']) target_size = current_size + size_in_byte logging(f"Expanding PVC {claim_name} from {current_size} to {target_size}") - return self.pvc.expand(claim_name, target_size) + expanded_size = self.claim.expand(claim_name, target_size) + self.set_annotation(claim_name, ANNOT_EXPANDED_SIZE, str(expanded_size)) diff --git a/e2e/libs/recurringjob/base.py b/e2e/libs/recurringjob/base.py index 5e4897fbdf..06da5cd528 100644 --- a/e2e/libs/recurringjob/base.py +++ b/e2e/libs/recurringjob/base.py @@ -22,7 +22,3 @@ def add_to_volume(self, job_name, volume_name): @abstractmethod def check_jobs_work(self, volume_name): return NotImplemented - - @abstractmethod - def cleanup(self, volume_names): - return NotImplemented diff --git a/e2e/libs/recurringjob/crd.py b/e2e/libs/recurringjob/crd.py index 54a0ca561d..423751a592 100644 --- a/e2e/libs/recurringjob/crd.py +++ b/e2e/libs/recurringjob/crd.py @@ -3,11 +3,12 @@ from kubernetes import client from recurringjob.base import Base -from recurringjob.rest import Rest - from recurringjob.constant import RETRY_COUNTS from recurringjob.constant import RETRY_INTERVAL +from recurringjob.rest import Rest +from utility.constant import LABEL_TEST +from utility.constant import LABEL_TEST_VALUE from utility.utility import logging @@ -18,15 +19,16 @@ def __init__(self): self.batch_v1_api = client.BatchV1Api() self.obj_api = client.CustomObjectsApi() - def cleanup(self, recurringjob_names): - for recurringjob_name in recurringjob_names: - self.delete(recurringjob_name) - def create(self, name, task, groups, cron, retain, concurrency, label): body = { "apiVersion": "longhorn.io/v1beta2", "kind": "RecurringJob", - "metadata": {"name": name}, + "metadata": { + "name": name, + "labels": { + LABEL_TEST: LABEL_TEST_VALUE + } + }, "spec": { "name": name, "groups": groups, @@ -70,6 +72,15 @@ def get(self, name): name=name, ) + def list(self, label_selector=None): + return self.obj_api.list_namespaced_custom_object( + group="longhorn.io", + version="v1beta2", + namespace="longhorn-system", + plural="recurringjobs", + label_selector=label_selector + ) + def add_to_volume(self, job_name, volume_name): logging("Delegating the add_to_volume call to API because there is no CRD implementation") return self.rest.add_to_volume(job_name, volume_name) diff --git a/e2e/libs/recurringjob/recurringjob.py b/e2e/libs/recurringjob/recurringjob.py index ea34f717d5..1029e20064 100644 --- a/e2e/libs/recurringjob/recurringjob.py +++ b/e2e/libs/recurringjob/recurringjob.py @@ -25,17 +25,19 @@ def create(self, labels={}): return self.recurringjob.create(job_name, task, groups, cron, retain, concurrency, labels) - def delete(self, job_name, volume_name): - return self.recurringjob.delete(job_name, volume_name) + def delete(self, job_name): + return self.recurringjob.delete(job_name) def get(self, job_name): return self.recurringjob.get(job_name) + def list(self, label_selector=None): + return self.recurringjob.list( + label_selector=label_selector + ) + def add_to_volume(self, job_name, volume_name): return self.recurringjob.add_to_volume(job_name, volume_name) def check_jobs_work(self, volume_name): return self.recurringjob.check_jobs_work(volume_name) - - def cleanup(self, volume_names): - return self.recurringjob.cleanup(volume_names) diff --git a/e2e/libs/recurringjob/rest.py b/e2e/libs/recurringjob/rest.py index 806c49705d..ec35b8ab46 100644 --- a/e2e/libs/recurringjob/rest.py +++ b/e2e/libs/recurringjob/rest.py @@ -5,23 +5,22 @@ from kubernetes import client from recurringjob.base import Base +from recurringjob.constant import RETRY_COUNTS +from recurringjob.constant import RETRY_INTERVAL from utility.utility import filter_cr from utility.utility import get_longhorn_client from utility.utility import logging -from recurringjob.constant import RETRY_COUNTS -from recurringjob.constant import RETRY_INTERVAL - class Rest(Base): def __init__(self): - self.client = get_longhorn_client() + self.longhorn_client = get_longhorn_client() self.batch_v1_api = client.BatchV1Api() def create(self, name, task, groups, cron, retain, concurrency, labels): - self.client.create_recurring_job( + self.longhorn_client.create_recurring_job( Name=name, Task=task, Groups=groups, @@ -32,15 +31,15 @@ def create(self, name, task, groups, cron, retain, concurrency, labels): self._wait_for_cron_job_create(name) def delete(self, job_name, volume_name): - self.client.delete(self.get(job_name)) + self.longhorn_client.delete(self.get(job_name)) self._wait_for_cron_job_delete(job_name) self._wait_for_volume_recurringjob_delete(job_name, volume_name) def get(self, name): - return self.client.by_id_recurring_job(name) + return self.longhorn_client.by_id_recurring_job(name) def add_to_volume(self, job_name, volume_name): - volume = self.client.by_id_volume(volume_name) + volume = self.longhorn_client.by_id_volume(volume_name) volume.recurringJobAdd(name=job_name, isGroup=False) self._wait_for_volume_recurringjob_update(job_name, volume_name) @@ -65,7 +64,7 @@ def _wait_for_volume_recurringjob_delete(self, job_name, volume_name): assert deleted def get_volume_recurringjobs_and_groups(self, volume_name): - volume = self.client.by_id_volume(volume_name) + volume = self.longhorn_client.by_id_volume(volume_name) list = volume.recurringJobList() jobs = [] groups = [] @@ -123,7 +122,7 @@ def _check_snapshot_created_in_time(self, volume_name, job_name, period_in_sec): label_selector=f"longhornvolume={volume_name}" - max_iterations = period_in_sec * 2 + max_iterations = period_in_sec * 10 for _ in range(max_iterations): time.sleep(1) @@ -137,17 +136,21 @@ def _check_snapshot_created_in_time(self, volume_name, job_name, period_in_sec): # but job_name is in spec.labels.RecurringJob # and crd doesn't support field selector # so need to filter by ourselves + if item['spec']['labels'] is None: + continue + try: assert item['spec']['labels']['RecurringJob'] == job_name - except Exception as e: + except AssertionError: continue snapshot_timestamp = datetime.strptime(snapshot_list['items'][0]['metadata']['creationTimestamp'], '%Y-%m-%dT%H:%M:%SZ').timestamp() - logging(f"Got snapshot {item['metadata']['name']} timestamp = {snapshot_timestamp}") if snapshot_timestamp > current_timestamp: return + logging(f"Snapshot {item['metadata']['name']} timestamp = {snapshot_timestamp} is not greater than {current_timestamp}") + assert False, f"No new snapshot created by recurringjob {job_name} for {volume_name} since {current_time}" def _check_backup_created_in_time(self, volume_name, period_in_sec): @@ -157,7 +160,7 @@ def _check_backup_created_in_time(self, volume_name, period_in_sec): label_selector=f"backup-volume={volume_name}" - max_iterations = period_in_sec * 2 + max_iterations = period_in_sec * 10 for _ in range(max_iterations): time.sleep(1) @@ -168,11 +171,12 @@ def _check_backup_created_in_time(self, volume_name, period_in_sec): for item in backup_list['items']: backup_timestamp = datetime.strptime(item['metadata']['creationTimestamp'], '%Y-%m-%dT%H:%M:%SZ').timestamp() - logging(f"Got backup {item['metadata']['name']} timestamp = {backup_timestamp}") if backup_timestamp > current_timestamp: return + logging(f"Backup {item['metadata']['name']} timestamp = {backup_timestamp} is not greater than {current_timestamp}") + assert False, f"No new backup created by recurringjob for {volume_name} since {current_time}" def cleanup(self, volume_names): diff --git a/e2e/libs/replica/base.py b/e2e/libs/replica/base.py index 2abca02ca5..a3b6681af6 100644 --- a/e2e/libs/replica/base.py +++ b/e2e/libs/replica/base.py @@ -4,17 +4,17 @@ class Base(ABC): @abstractmethod - def get_replica(self, volume_name, node_name): + def get(self, volume_name, node_name): return NotImplemented @abstractmethod - def delete_replica(self, volume_name, node_name): + def delete(self, volume_name, node_name): return NotImplemented @abstractmethod - def wait_for_replica_rebuilding_start(self, volume_name, node_name): + def wait_for_rebuilding_start(self, volume_name, node_name): return NotImplemented @abstractmethod - def wait_for_replica_rebuilding_complete(self, volume_name, node_name): + def wait_for_rebuilding_complete(self, volume_name, node_name): return NotImplemented diff --git a/e2e/libs/replica/crd.py b/e2e/libs/replica/crd.py index 1d7c02937c..8f2fb6b2ff 100644 --- a/e2e/libs/replica/crd.py +++ b/e2e/libs/replica/crd.py @@ -1,4 +1,4 @@ -from utils.common_utils import k8s_cr_api +from kubernetes import client from replica.base import Base from replica.rest import Rest @@ -8,7 +8,7 @@ class CRD(Base): def __init__(self, node_exec): - self.cr_api = k8s_cr_api() + self.obj_api = client.CustomObjectsApi() self.node_exec = node_exec def get_replica(self, volume_name, node_name): @@ -43,7 +43,7 @@ def delete_replica(self, volume_name, node_name): for replica in replicas: replica_name = replica['metadata']['name'] - k8s_cr_api().delete_namespaced_custom_object( + self.obj_api.delete_namespaced_custom_object( group="longhorn.io", version="v1beta2", namespace="longhorn-system", diff --git a/e2e/libs/replica/replica.py b/e2e/libs/replica/replica.py index 140b6443ee..56ff4fa164 100644 --- a/e2e/libs/replica/replica.py +++ b/e2e/libs/replica/replica.py @@ -13,7 +13,7 @@ def __init__(self, node_exec): self.replica = CRD(node_exec) # delete replicas, if input parameters are empty then will delete all - def delete_replica(self, volume_name="", node_name=""): + def delete(self, volume_name="", node_name=""): return self.replica.delete_replica(volume_name, node_name) def get_replica(self, volume_name, node_name): diff --git a/e2e/libs/replica/rest.py b/e2e/libs/replica/rest.py index 8c492c0b56..ab9bb37cf2 100644 --- a/e2e/libs/replica/rest.py +++ b/e2e/libs/replica/rest.py @@ -1,17 +1,16 @@ import time from replica.base import Base - -from utils import common_utils -from utility.utility import logging - from replica.constant import RETRY_COUNTS from replica.constant import RETRY_INTERVAL +from utility.utility import logging +from utility.utility import get_longhorn_client + class Rest(Base): def __init__(self, node_exec): - self.longhorn_client = common_utils.get_longhorn_client() + self.longhorn_client = get_longhorn_client() self.node_exec = node_exec def get_replica(self, volume_name, node_name): diff --git a/e2e/libs/utility/constant.py b/e2e/libs/utility/constant.py new file mode 100644 index 0000000000..8e1fa904e0 --- /dev/null +++ b/e2e/libs/utility/constant.py @@ -0,0 +1,10 @@ +KIND_DEPLOYMENT = 'deployment' +KIND_STATEFULSET = 'statefulset' + +LABEL_TEST = 'test.longhorn.io' +LABEL_TEST_VALUE = 'e2e' + +ANNOT_CHECKSUM = f'{LABEL_TEST}/last-recorded-checksum' +ANNOT_EXPANDED_SIZE = f'{LABEL_TEST}/last-recorded-expanded-size' + +NAME_PREFIX = 'e2e-test' diff --git a/e2e/libs/utility/utility.py b/e2e/libs/utility/utility.py index fd6d69d746..4a9746c7a5 100644 --- a/e2e/libs/utility/utility.py +++ b/e2e/libs/utility/utility.py @@ -5,6 +5,9 @@ import random import yaml +from robot.api import logger +from robot.libraries.BuiltIn import BuiltIn + from longhorn import from_env from kubernetes import client @@ -12,8 +15,7 @@ from kubernetes import dynamic from kubernetes.client.rest import ApiException -from robot.api import logger -from robot.libraries.BuiltIn import BuiltIn +from utility.constant import NAME_PREFIX def logging(msg, also_report=False): @@ -29,14 +31,14 @@ def get_retry_count_and_interval(): return retry_count, retry_interval -def generate_name(name_prefix="test-"): +def generate_name_random(name_prefix="test-"): return name_prefix + \ ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(6)) -def generate_volume_name(): - return generate_name("vol-") +def generate_name_with_suffix(kind, suffix): + return f"{NAME_PREFIX}-{kind}-{suffix}" def init_k8s_api_client(): @@ -54,7 +56,6 @@ def wait_for_cluster_ready(): core_api = client.CoreV1Api() retry_count, retry_interval = get_retry_count_and_interval() for i in range(retry_count): - logging(f"Waiting for cluster ready ({i}) ...") try: resp = core_api.list_node() ready = True @@ -67,6 +68,8 @@ def wait_for_cluster_ready(): break except Exception as e: logging(f"Listing nodes error: {e}") + + logging(f"Waiting for cluster ready ({i}) ...") time.sleep(retry_interval) assert ready, f"expect cluster's ready but it isn't {resp}" @@ -152,7 +155,7 @@ def get_longhorn_client(): longhorn_client = from_env(url=f"{longhorn_client_url}/v1/schemas") return longhorn_client except Exception as e: - logging(f"Getting longhorn client error: {e}") + logging(f"Getting longhorn client error: {e}, retry ({i}) ...") time.sleep(retry_interval) else: logging(f"Initializing longhorn api client from longhorn manager") @@ -170,7 +173,7 @@ def get_longhorn_client(): longhorn_client = from_env(url=f"http://{ip}:9500/v1/schemas") return longhorn_client except Exception as e: - logging(f"Getting longhorn client error: {e}") + logging(f"Getting longhorn client error: {e}, retry ({i}) ...") time.sleep(retry_interval) diff --git a/e2e/libs/volume/base.py b/e2e/libs/volume/base.py index 7684928073..487f877a56 100644 --- a/e2e/libs/volume/base.py +++ b/e2e/libs/volume/base.py @@ -47,6 +47,6 @@ def wait_for_replica_rebuilding_complete(self, volume_name, node_name): def check_data_checksum(self, volume_name, checksum): return NotImplemented - @abstractmethod - def cleanup(self, volume_names): - return NotImplemented + # @abstractmethod + # def cleanup(self, volume_names): + # return NotImplemented diff --git a/e2e/libs/volume/crd.py b/e2e/libs/volume/crd.py index 59b38a7d4d..4ba9b11404 100644 --- a/e2e/libs/volume/crd.py +++ b/e2e/libs/volume/crd.py @@ -2,15 +2,16 @@ from kubernetes import client +from engine import Engine + +from utility.constant import LABEL_TEST +from utility.constant import LABEL_TEST_VALUE from utility.utility import get_retry_count_and_interval from utility.utility import logging -from engine.engine import Engine - from volume.base import Base -from volume.rest import Rest - from volume.constant import GIBIBYTE +from volume.rest import Rest class CRD(Base): @@ -20,20 +21,16 @@ def __init__(self, node_exec): self.node_exec = node_exec self.retry_count, self.retry_interval = get_retry_count_and_interval() - def get(self, volume_name): - return self.obj_api.get_namespaced_custom_object( - group="longhorn.io", - version="v1beta2", - namespace="longhorn-system", - plural="volumes", - name=volume_name - ) - def create(self, volume_name, size, replica_count): body = { "apiVersion": "longhorn.io/v1beta2", "kind": "Volume", - "metadata": {"name": volume_name}, + "metadata": { + "name": volume_name, + "labels": { + LABEL_TEST: LABEL_TEST_VALUE + } + }, "spec": { "frontend": "blockdev", "replicaAutoBalance": "ignored", @@ -50,6 +47,19 @@ def create(self, volume_name, size, replica_count): ) self.wait_for_volume_state(volume_name, "detached") + def delete(self, volume_name): + try: + self.obj_api.delete_namespaced_custom_object( + group="longhorn.io", + version="v1beta2", + namespace="longhorn-system", + plural="volumes", + name=volume_name + ) + self.wait_for_volume_delete(volume_name) + except Exception as e: + logging(f"Deleting volume error: {e}") + def attach(self, volume_name, node_name): self.obj_api.patch_namespaced_custom_object( group="longhorn.io", @@ -130,18 +140,51 @@ def detach(self, volume_name): self.wait_for_volume_state(volume_name, "detached") - def delete(self, volume_name): - try: - self.obj_api.delete_namespaced_custom_object( - group="longhorn.io", - version="v1beta2", - namespace="longhorn-system", - plural="volumes", - name=volume_name - ) - self.wait_for_volume_delete(volume_name) - except Exception as e: - logging(f"Deleting volume error: {e}") + def get(self, volume_name): + return self.obj_api.get_namespaced_custom_object( + group="longhorn.io", + version="v1beta2", + namespace="longhorn-system", + plural="volumes", + name=volume_name + ) + + def list(self, label_selector=None): + return self.obj_api.list_namespaced_custom_object( + group="longhorn.io", + version="v1beta2", + namespace="longhorn-system", + plural="volumes", + label_selector=label_selector + ) + + def set_annotation(self, volume_name, annotation_key, annotation_value): + # retry conflict error + for i in range(self.retry_count): + try: + volume = self.get(volume_name) + annotations = volume['metadata'].get('annotations', {}) + annotations[annotation_key] = annotation_value + volume['metadata']['annotations'] = annotations + self.obj_api.replace_namespaced_custom_object( + group="longhorn.io", + version="v1beta2", + namespace="longhorn-system", + plural="volumes", + name=volume_name, + body=volume + ) + break + except Exception as e: + if e.status == 409: + logging(f"Conflict error: {e.body}, retry ({i}) ...") + else: + raise e + time.sleep(self.retry_interval) + + def get_annotation_value(self, volume_name, annotation_key): + volume = self.get(volume_name) + return volume['metadata']['annotations'].get(annotation_key) def wait_for_volume_delete(self, volume_name): for i in range(self.retry_count): @@ -197,18 +240,21 @@ def wait_for_volume_robustness_not(self, volume_name, not_desired_state): def wait_for_volume_expand_to_size(self, volume_name, expected_size): engine = None + engine_current_size = 0 + engine_expected_size = int(expected_size) engine_operation = Engine() for i in range(self.retry_count): - logging(f"Waiting for {volume_name} expand to {expected_size} ({i}) ...") - engine = engine_operation.get_engine_by_volume(self.get(volume_name)) - if int(engine['status']['currentSize']) == expected_size: + engine_current_size = int(engine['status']['currentSize']) + if engine_current_size == engine_expected_size: break + logging(f"Waiting for volume engine expand from {engine_current_size} to {expected_size} ({i}) ...") + time.sleep(self.retry_interval) assert engine is not None - assert int(engine['status']['currentSize']) == expected_size + assert engine_current_size == engine_expected_size def get_endpoint(self, volume_name): logging("Delegating the get_endpoint call to API because there is no CRD implementation") @@ -219,8 +265,7 @@ def write_random_data(self, volume_name, size): endpoint = self.get_endpoint(volume_name) checksum = self.node_exec.issue_cmd( node_name, - f"dd if=/dev/urandom of={endpoint} bs=1M count={size} status=none;\ - md5sum {endpoint} | awk \'{{print $1}}\'") + f"dd if=/dev/urandom of={endpoint} bs=1M count={size} status=none; md5sum {endpoint} | awk \'{{print $1}}\'") return checksum def keep_writing_data(self, volume_name, size): @@ -264,14 +309,7 @@ def wait_for_replica_rebuilding_complete(self, volume_name, node_name): def check_data_checksum(self, volume_name, checksum): node_name = self.get(volume_name)["spec"]["nodeID"] endpoint = self.get_endpoint(volume_name) - _checksum = self.node_exec.issue_cmd( + actual_checksum = self.node_exec.issue_cmd( node_name, f"md5sum {endpoint} | awk \'{{print $1}}\'") - logging(f"Got {endpoint} checksum = {_checksum},\ - expected checksum = {checksum}") - assert _checksum == checksum - - def cleanup(self, volume_names): - for volume_name in volume_names: - logging(f"Deleting volume {volume_name}") - self.delete(volume_name) + assert actual_checksum == checksum diff --git a/e2e/libs/volume/rest.py b/e2e/libs/volume/rest.py index f626714cc3..4679edc41e 100644 --- a/e2e/libs/volume/rest.py +++ b/e2e/libs/volume/rest.py @@ -1,17 +1,16 @@ import os import time -from utility.utility import get_longhorn_client -from utility.utility import logging - from volume.base import Base - from volume.constant import DEV_PATH from volume.constant import RETRY_COUNTS from volume.constant import RETRY_INTERVAL from volume.constant import VOLUME_FRONTEND_BLOCKDEV from volume.constant import VOLUME_FRONTEND_ISCSI +from utility.utility import get_longhorn_client +from utility.utility import logging + class Rest(Base): diff --git a/e2e/libs/volume/volume.py b/e2e/libs/volume/volume.py index d8d81c50aa..c6e7773b04 100644 --- a/e2e/libs/volume/volume.py +++ b/e2e/libs/volume/volume.py @@ -18,20 +18,32 @@ def __init__(self): else: self.volume = Rest(node_exec) - def get(self, volume_name): - return self.volume.get(volume_name) - def create(self, volume_name, size, replica_count): return self.volume.create(volume_name, size, replica_count) + def delete(self, volume_name): + return self.volume.delete(volume_name) + def attach(self, volume_name, node_name): return self.volume.attach(volume_name, node_name) def detach(self, volume_name): return self.volume.detach(volume_name) - def delete(self, volume_name): - return self.volume.delete(volume_name) + def get(self, volume_name): + return self.volume.get(volume_name) + + def list(self, label_selector=None): + return self.volume.list(label_selector=label_selector) + + def list_names(self, label_selector=None): + return [item['metadata']['name'] for item in self.list(label_selector)['items']] + + def set_annotation(self, volume_name, annotation_key, annotation_value): + return self.volume.set_annotation(volume_name, annotation_key, annotation_value) + + def get_annotation_value(self, volume_name, annotation_key): + return self.volume.get_annotation_value(volume_name, annotation_key) def wait_for_volume_state(self, volume_name, desired_state): return self.volume.wait_for_volume_state(volume_name, desired_state) @@ -77,5 +89,3 @@ def wait_for_replica_rebuilding_complete(self, volume_name, node_name): def check_data_checksum(self, volume_name, checksum): return self.volume.check_data_checksum(volume_name, checksum) - def cleanup(self, volume_names): - return self.volume.cleanup(volume_names) diff --git a/e2e/libs/workload/constant.py b/e2e/libs/workload/constant.py index edeb75440f..28ad7de5b9 100644 --- a/e2e/libs/workload/constant.py +++ b/e2e/libs/workload/constant.py @@ -2,4 +2,4 @@ IMAGE_LITMUX = 'litmuschaos/go-runner:latest' IMAGE_UBUNTU = 'ubuntu:16.04' -WAIT_FOR_POD_STABLE_MAX_RETRY = 90 +WAIT_FOR_POD_STABLE_MAX_RETRY = 60 diff --git a/e2e/libs/workload/deployment.py b/e2e/libs/workload/deployment.py index 577924c1d1..970cb9dc8b 100644 --- a/e2e/libs/workload/deployment.py +++ b/e2e/libs/workload/deployment.py @@ -4,23 +4,27 @@ from kubernetes import client from kubernetes.client.rest import ApiException -from utility.utility import get_name_suffix +from utility.constant import LABEL_TEST +from utility.constant import LABEL_TEST_VALUE from utility.utility import get_retry_count_and_interval -def create_deployment(volume_type, option): +def create_deployment(name, claim_name): filepath = f"./templates/workload/deployment.yaml" with open(filepath, 'r') as f: namespace = 'default' manifest_dict = yaml.safe_load(f) - suffix = get_name_suffix(volume_type, option) + # correct workload name - manifest_dict['metadata']['name'] += suffix - manifest_dict['metadata']['labels']['app'] += suffix - manifest_dict['spec']['selector']['matchLabels']['app'] += suffix - manifest_dict['spec']['template']['metadata']['labels']['app'] += suffix + manifest_dict['metadata']['name'] = name + manifest_dict['metadata']['labels']['app'] = name + manifest_dict['metadata']['labels'][LABEL_TEST] = LABEL_TEST_VALUE + manifest_dict['spec']['selector']['matchLabels']['app'] = name + manifest_dict['spec']['template']['metadata']['labels']['app'] = name + manifest_dict['spec']['template']['metadata']['labels'][LABEL_TEST] = LABEL_TEST_VALUE + # correct claim name - manifest_dict['spec']['template']['spec']['volumes'][0]['persistentVolumeClaim']['claimName'] += suffix + manifest_dict['spec']['template']['spec']['volumes'][0]['persistentVolumeClaim']['claimName'] = claim_name api = client.AppsV1Api() deployment = api.create_namespaced_deployment( @@ -43,8 +47,6 @@ def create_deployment(volume_type, option): assert deployment.status.ready_replicas == replicas - return deployment_name - def delete_deployment(name, namespace='default'): api = client.AppsV1Api() @@ -69,3 +71,11 @@ def delete_deployment(name, namespace='default'): break time.sleep(retry_interval) assert deleted + + +def list_deployments(namespace='default', label_selector=None): + api = client.AppsV1Api() + return api.list_namespaced_deployment( + namespace=namespace, + label_selector=label_selector + ) diff --git a/e2e/libs/workload/persistentvolumeclaim.py b/e2e/libs/workload/persistentvolumeclaim.py deleted file mode 100644 index ac954b7502..0000000000 --- a/e2e/libs/workload/persistentvolumeclaim.py +++ /dev/null @@ -1,55 +0,0 @@ -import time -import yaml - -from kubernetes import client -from kubernetes.client.rest import ApiException - -from utility.utility import get_name_suffix -from utility.utility import get_retry_count_and_interval - - -def create_persistentvolumeclaim(volume_type, option): - filepath = "./templates/workload/pvc.yaml" - with open(filepath, 'r') as f: - namespace = 'default' - manifest_dict = yaml.safe_load(f) - suffix = get_name_suffix(volume_type, option) - # correct pvc name - manifest_dict['metadata']['name'] += suffix - # correct storageclass name - if option: - manifest_dict['spec']['storageClassName'] += f"-{option}" - # correct access mode` - if volume_type == 'rwx': - manifest_dict['spec']['accessModes'][0] = 'ReadWriteMany' - api = client.CoreV1Api() - - pvc = api.create_namespaced_persistent_volume_claim( - body=manifest_dict, - namespace=namespace) - - return pvc.metadata.name - - -def delete_persistentvolumeclaim(name, namespace='default'): - api = client.CoreV1Api() - try: - api.delete_namespaced_persistent_volume_claim( - name=name, - namespace=namespace, - grace_period_seconds=0) - except ApiException as e: - assert e.status == 404 - - retry_count, retry_interval = get_retry_count_and_interval() - for _ in range(retry_count): - resp = api.list_namespaced_persistent_volume_claim(namespace=namespace) - deleted = True - for item in resp.items: - if item.metadata.name == name: - deleted = False - break - if deleted: - break - time.sleep(retry_interval) - assert deleted diff --git a/e2e/libs/workload/pod.py b/e2e/libs/workload/pod.py index 30f56932ea..d0f3aecf69 100644 --- a/e2e/libs/workload/pod.py +++ b/e2e/libs/workload/pod.py @@ -4,7 +4,7 @@ from kubernetes.client import rest from utility.utility import logging -from utility.utility import generate_name +from utility.utility import generate_name_random from utility.utility import get_retry_count_and_interval from workload.constant import IMAGE_BUSYBOX @@ -13,7 +13,7 @@ def new_pod_manifest(pod_name="", image="", command=[], args=[], claim_name="", node_name="", labels={}): if pod_name == "": - pod_name = generate_name() + pod_name = generate_name_random() # Set default image and args if image is None: @@ -144,3 +144,17 @@ def wait_for_pod_status(name, status, namespace='default'): time.sleep(retry_interval) assert is_running + + +def get_volume_name_by_pod(name, namespace='default'): + pod = get_pod(name, namespace) + claim_name = "" + for volume in pod.spec.volumes: + if volume.name == 'pod-data': + claim_name = volume.persistent_volume_claim.claim_name + break + assert claim_name, f"Failed to get claim name for pod {pod.metadata.name}" + + api = client.CoreV1Api() + claim = api.read_namespaced_persistent_volume_claim(name=claim_name, namespace='default') + return claim.spec.volume_name diff --git a/e2e/libs/workload/statefulset.py b/e2e/libs/workload/statefulset.py index 58368ab3b9..4c21c827e1 100644 --- a/e2e/libs/workload/statefulset.py +++ b/e2e/libs/workload/statefulset.py @@ -4,30 +4,34 @@ from kubernetes import client from kubernetes.client.rest import ApiException -from utility.utility import get_name_suffix +from utility.constant import LABEL_TEST +from utility.constant import LABEL_TEST_VALUE from utility.utility import get_retry_count_and_interval from utility.utility import logging -def create_statefulset(volume_type, option): +def create_statefulset(statefulset_name, volume_type, option): filepath = "./templates/workload/statefulset.yaml" with open(filepath, 'r') as f: namespace = 'default' manifest_dict = yaml.safe_load(f) - suffix = get_name_suffix(volume_type, option) + # correct workload name - manifest_dict['metadata']['name'] += suffix - manifest_dict['spec']['selector']['matchLabels']['app'] += suffix - manifest_dict['spec']['serviceName'] += suffix - manifest_dict['spec']['template']['metadata']['labels']['app'] += suffix + manifest_dict['metadata']['name'] = statefulset_name + manifest_dict['metadata']['labels'][LABEL_TEST] = LABEL_TEST_VALUE + manifest_dict['spec']['selector']['matchLabels']['app'] = statefulset_name + manifest_dict['spec']['serviceName'] = statefulset_name + manifest_dict['spec']['template']['metadata']['labels']['app'] = statefulset_name + # correct storageclass name if option: manifest_dict['spec']['volumeClaimTemplates'][0]['spec']['storageClassName'] += f"-{option}" + # correct access mode` - if volume_type == 'rwx': + if volume_type == 'RWX': manifest_dict['spec']['volumeClaimTemplates'][0]['spec']['accessModes'][0] = 'ReadWriteMany' - api = client.AppsV1Api() + api = client.AppsV1Api() statefulset = api.create_namespaced_stateful_set( body=manifest_dict, namespace=namespace) @@ -37,8 +41,6 @@ def create_statefulset(volume_type, option): wait_for_statefulset_replicas_ready(statefulset_name, replicas) - return statefulset_name - def wait_for_statefulset_replicas_ready(statefulset_name, expected_ready_count, namespace='default'): apps_v1_api = client.AppsV1Api() @@ -56,7 +58,10 @@ def wait_for_statefulset_replicas_ready(statefulset_name, expected_ready_count, break time.sleep(retry_interval) - assert statefulset.status.ready_replicas == expected_ready_count + assert statefulset.status.ready_replicas == expected_ready_count, \ + f"Unexpected statefulset {statefulset_name} ready replicas:\n" \ + f"GOT: {statefulset.status.ready_replicas}\n" \ + f"EXPECT: {expected_ready_count}" def delete_statefulset(name, namespace='default'): @@ -89,6 +94,15 @@ def get_statefulset(name, namespace='default'): return api.read_namespaced_stateful_set(name=name, namespace=namespace) + +def list_statefulsets(namespace='default', label_selector=None): + api = client.AppsV1Api() + return api.list_namespaced_stateful_set( + namespace=namespace, + label_selector=label_selector + ) + + def scale_statefulset(name, replica_count, namespace='default'): logging(f"Scaling statefulset {name} to {replica_count}") @@ -102,3 +116,22 @@ def scale_statefulset(name, replica_count, namespace='default'): statefulset = get_statefulset(name, namespace) assert statefulset.spec.replicas == int(replica_count) + + +def add_or_update_statefulset_annotation(name, annotation_key, annotation_value, namespace="default"): + statefulset = get_statefulset(name, namespace) + + annotations = statefulset.metadata.annotations + annotations[annotation_key] = annotation_value + statefulset.metadata.annotations = annotations + + api = client.AppsV1Api() + api.patch_namespaced_persistent_volume_claim( + name=name, + namespace=namespace, + body=statefulset + ) + +def get_statefulset_annotation_value(name, annotation_key, namespace="default"): + statefulset = get_statefulset(name, namespace) + return statefulset['metadata']['annotations'].get(annotation_key) diff --git a/e2e/libs/workload/workload.py b/e2e/libs/workload/workload.py index ceff66990b..9fe3768f61 100644 --- a/e2e/libs/workload/workload.py +++ b/e2e/libs/workload/workload.py @@ -5,13 +5,13 @@ from kubernetes.client.rest import ApiException from kubernetes.stream import stream -from utility.utility import get_name_suffix from utility.utility import get_retry_count_and_interval from utility.utility import logging from workload.constant import WAIT_FOR_POD_STABLE_MAX_RETRY + def create_storageclass(name): if name == 'longhorn-test-strict-local': filepath = "./templates/workload/strict_local_storageclass.yaml" @@ -44,45 +44,55 @@ def get_workload_pod_names(workload_name): return pod_names -def get_workload_pods(workload_name): +def get_workload_pods(workload_name, namespace="default"): api = client.CoreV1Api() label_selector = f"app={workload_name}" resp = api.list_namespaced_pod( - namespace="default", + namespace=namespace, label_selector=label_selector) return resp.items def get_workload_volume_name(workload_name): api = client.CoreV1Api() - pvc_name = get_workload_pvc_name(workload_name) - pvc = api.read_namespaced_persistent_volume_claim( - name=pvc_name, namespace='default') - return pvc.spec.volume_name + claim_name = get_workload_persistent_volume_claim_name(workload_name) + claim = api.read_namespaced_persistent_volume_claim( + name=claim_name, namespace='default') + return claim.spec.volume_name -def get_workload_pvc_name(workload_name): - pod = get_workload_pods(workload_name)[0] - logging(f"Got pod {pod.metadata.name} for workload {workload_name}") - for volume in pod.spec.volumes: - if volume.name == 'pod-data': - pvc_name = volume.persistent_volume_claim.claim_name - break - assert pvc_name - return pvc_name +def get_workload_persistent_volume_claim_name(workload_name, index=0): + return get_workload_persistent_volume_claim_names(workload_name)[int(index)] -def write_pod_random_data(pod_name, size_in_mb, path="/data/random-data"): +def get_workload_persistent_volume_claim_names(workload_name, namespace="default"): + claim_names = [] api = client.CoreV1Api() - write_cmd = [ + label_selector = f"app={workload_name}" + claim = api.list_namespaced_persistent_volume_claim( + namespace=namespace, + label_selector=label_selector + ) + + for item in claim.items: + claim_names.append(item.metadata.name) + + assert len(claim_names) > 0, f"Failed to get PVC names for workload {workload_name}" + return claim_names + + +def write_pod_random_data(pod_name, size_in_mb, file_name, + data_directory="/data", ): + data_path = f"{data_directory}/{file_name}" + api = client.CoreV1Api() + write_data_cmd = [ '/bin/sh', '-c', - f"dd if=/dev/urandom of={path} bs=1M count={size_in_mb} status=none;\ - md5sum {path} | awk \'{{print $1}}\'" + f"dd if=/dev/urandom of={data_path} bs=1M count={size_in_mb} status=none; echo `md5sum {data_path} | awk \'{{print $1}}\'`" ] return stream( api.connect_get_namespaced_pod_exec, pod_name, 'default', - command=write_cmd, stderr=True, stdin=False, stdout=True, + command=write_data_cmd, stderr=True, stdin=False, stdout=True, tty=False) @@ -93,49 +103,84 @@ def keep_writing_pod_data(pod_name, size_in_mb=256, path="/data/overwritten-data '-c', f"while true; do dd if=/dev/urandom of={path} bs=1M count={size_in_mb} status=none; done > /dev/null 2> /dev/null &" ] - logging(f"Keep writing pod {pod_name}") + + logging(f"Creating process to keep writing data to pod {pod_name}") res = stream( api.connect_get_namespaced_pod_exec, pod_name, 'default', command=write_cmd, stderr=True, stdin=False, stdout=True, tty=False) - logging(f"Created process to keep writing pod {pod_name}") - return res + assert res == "", f"Failed to create process to keep writing data to pod {pod_name}" -def check_pod_data_checksum(pod_name, checksum, path="/data/random-data"): - logging(f"Checking pod {pod_name} data checksum") +def check_pod_data_checksum(expected_checksum, pod_name, file_name, data_directory="/data"): + file_path = f"{data_directory}/{file_name}" api = client.CoreV1Api() - cmd = [ + cmd_get_file_checksum = [ '/bin/sh', '-c', - f"md5sum {path} | awk \'{{print $1}}\'" + f"md5sum {file_path} | awk \'{{print $1}}\'" ] - _checksum = stream( + actual_checksum = stream( api.connect_get_namespaced_pod_exec, pod_name, 'default', - command=cmd, stderr=True, stdin=False, stdout=True, + command=cmd_get_file_checksum, stderr=True, stdin=False, stdout=True, tty=False) - assert _checksum == checksum, \ - f"Got {path} checksum = {_checksum}\n" \ - f"Expected checksum = {checksum}" + assert actual_checksum == expected_checksum, \ + f"Got {file_path} checksum = {actual_checksum}\n" \ + f"Expected checksum = {expected_checksum}" + + +def wait_for_workload_pods_running(workload_name, namespace="default"): + retry_count, retry_interval = get_retry_count_and_interval() + for i in range(retry_count): + pods = get_workload_pods(workload_name, namespace=namespace) + if len(pods) > 0: + running_pods = [] + for pod in pods: + if pod.status.phase == "Running": + running_pods.append(pod.metadata.name) + if len(running_pods) == len(pods): + return + + logging(f"Waiting for {workload_name} pods {running_pods} running, retry ({i}) ...") + time.sleep(retry_interval) + + assert False, f"Timeout waiting for {workload_name} pods running" + + +def wait_for_workload_pods_stable(workload_name, namespace="default"): + stable_pods = {} + wait_for_stable_retry = {} -def wait_for_workload_pod_stable(workload_name): - stable_pod = None - wait_for_stable_retry = 0 retry_count, retry_interval = get_retry_count_and_interval() for i in range(retry_count): - logging(f"Waiting for {workload_name} pod stable ({i}) ...") - pods = get_workload_pods(workload_name) + pods = get_workload_pods(workload_name, namespace=namespace) + assert len(pods) > 0 + for pod in pods: + pod_name = pod.metadata.name if pod.status.phase == "Running": - if stable_pod is None or \ - stable_pod.status.start_time != pod.status.start_time: - stable_pod = pod - wait_for_stable_retry = 0 - break + if pod_name not in stable_pods or \ + stable_pods[pod_name].status.start_time != pod.status.start_time: + stable_pods[pod_name] = pod + wait_for_stable_retry[pod_name] = 0 else: - wait_for_stable_retry += 1 - if wait_for_stable_retry == WAIT_FOR_POD_STABLE_MAX_RETRY: - return stable_pod + wait_for_stable_retry[pod_name] += 1 + + wait_for_stable_pod = [] + for pod in pods: + if pod.status.phase != "Running": + wait_for_stable_pod.append(pod.metadata.name) + continue + + pod_name = pod.metadata.name + if wait_for_stable_retry[pod_name] != WAIT_FOR_POD_STABLE_MAX_RETRY: + wait_for_stable_pod.append(pod_name) + + if len(wait_for_stable_pod) == 0: + return + + logging(f"Waiting for {workload_name} pods {wait_for_stable_pod} stable, retry ({i}) ...") time.sleep(retry_interval) - assert False + + assert False, f"Timeout waiting for {workload_name} pods {wait_for_stable_pod} stable)" diff --git a/e2e/templates/workload/pvc.yaml b/e2e/templates/workload/pvc.yaml index f42eef2b2c..89a16fb08e 100644 --- a/e2e/templates/workload/pvc.yaml +++ b/e2e/templates/workload/pvc.yaml @@ -3,6 +3,8 @@ kind: PersistentVolumeClaim metadata: name: test-pvc namespace: default + labels: + test.longhorn.io: e2e spec: accessModes: - ReadWriteOnce diff --git a/e2e/templates/workload/statefulset.yaml b/e2e/templates/workload/statefulset.yaml index 71d36a5aad..6807afc0ee 100644 --- a/e2e/templates/workload/statefulset.yaml +++ b/e2e/templates/workload/statefulset.yaml @@ -4,6 +4,8 @@ kind: StatefulSet metadata: name: test-statefulset namespace: default + labels: + test.longhorn.io: e2e spec: selector: matchLabels: