diff --git a/e2e/libs/keywords/node_instance_keywords.py b/e2e/libs/keywords/host_keywords.py similarity index 75% rename from e2e/libs/keywords/node_instance_keywords.py rename to e2e/libs/keywords/host_keywords.py index 268d6cc1f1..9387d9249a 100644 --- a/e2e/libs/keywords/node_instance_keywords.py +++ b/e2e/libs/keywords/host_keywords.py @@ -1,51 +1,52 @@ from robot.libraries.BuiltIn import BuiltIn +from host import Host +from host.constant import NODE_REBOOT_DOWN_TIME_SECOND + from node import Node -from node import NodeInstance -from node.constant import NODE_REBOOT_DOWN_TIME_SECOND from utility.utility import logging -class node_instance_keywords: +class host_keywords: def __init__(self): self.volume_keywords = BuiltIn().get_library_instance('volume_keywords') + self.host = Host() self.node = Node() - self.nodeInstance = NodeInstance() def reboot_volume_node(self, volume_name): node_id = self.volume_keywords.get_node_id_by_replica_locality(volume_name, "volume node") logging(f'Rebooting volume {volume_name} node {node_id} with downtime {NODE_REBOOT_DOWN_TIME_SECOND} seconds') - self.nodeInstance.reboot_node(node_id) + self.host.reboot_node(node_id) def reboot_replica_node(self, volume_name): node_id = self.volume_keywords.get_node_id_by_replica_locality(volume_name, "replica node") logging(f'Rebooting volume {volume_name} node {node_id} with downtime {NODE_REBOOT_DOWN_TIME_SECOND} seconds') - self.nodeInstance.reboot_node(node_id) + self.host.reboot_node(node_id) def reboot_node_by_index(self, idx, power_off_time_in_min=1): node_name = self.node.get_node_by_index(idx) reboot_down_time_min = int(power_off_time_in_min) * 60 logging(f'Rebooting node {node_name} with downtime {reboot_down_time_min} minutes') - self.nodeInstance.reboot_node(node_name, reboot_down_time_min) + self.host.reboot_node(node_name, reboot_down_time_min) def reboot_all_worker_nodes(self, power_off_time_in_min=1): reboot_down_time_min = int(power_off_time_in_min) * 60 logging(f'Rebooting all worker nodes with downtime {reboot_down_time_min} minutes') - self.nodeInstance.reboot_all_worker_nodes(reboot_down_time_min) + self.host.reboot_all_worker_nodes(reboot_down_time_min) def reboot_all_nodes(self): logging(f'Rebooting all nodes with downtime {NODE_REBOOT_DOWN_TIME_SECOND} seconds') - self.nodeInstance.reboot_all_nodes() + self.host.reboot_all_nodes() def reboot_node_by_name(self, node_name, downtime_in_min=1): reboot_down_time_min = int(downtime_in_min) * 60 logging(f'Rebooting node {node_name} with downtime {reboot_down_time_min} minutes') - self.nodeInstance.reboot_node(node_name, reboot_down_time_min) + self.host.reboot_node(node_name, reboot_down_time_min) diff --git a/e2e/libs/keywords/workload_keywords.py b/e2e/libs/keywords/workload_keywords.py index 16f683e1e2..6316691880 100644 --- a/e2e/libs/keywords/workload_keywords.py +++ b/e2e/libs/keywords/workload_keywords.py @@ -2,7 +2,7 @@ from deployment_keywords import deployment_keywords from kubelet_keywords import kubelet_keywords -from node_instance_keywords import node_instance_keywords +from host_keywords import host_keywords from statefulset_keywords import statefulset_keywords from volume_keywords import volume_keywords @@ -17,6 +17,7 @@ from workload.workload import get_workload_volume_name from workload.workload import keep_writing_pod_data from workload.workload import write_pod_random_data +from workload.workload import wait_for_workload_pods_running from workload.workload import wait_for_workload_pods_stable from utility.constant import ANNOT_CHECKSUM @@ -32,7 +33,7 @@ class workload_keywords: def __init__(self): self.deployment_keywords = deployment_keywords() self.kubelet_keywords = kubelet_keywords() - self.node_instance_keywords = node_instance_keywords() + self.host_keywords = host_keywords() self.statefulset_keywords = statefulset_keywords() self.volume_keywords = volume_keywords() @@ -86,24 +87,28 @@ def keep_writing_workload_pod_data(self, workload_name): def reboot_workload_volume_node(self, workload_name, downtime_in_min=1): volume_name = get_workload_volume_name(workload_name) node_id = self.volume_keywords.get_node_id_by_replica_locality(volume_name, "volume node") - self.node_instance_keywords.reboot_node_by_name(node_id, downtime_in_min=downtime_in_min) + self.host_keywords.reboot_node_by_name(node_id, downtime_in_min=downtime_in_min) def restart_workload_kubelet(self, workload_name, downtime_in_sec): volume_name = get_workload_volume_name(workload_name) node_id = self.volume_keywords.get_node_id_by_replica_locality(volume_name, "volume node") self.kubelet_keywords.restart_kubelet(node_id, downtime_in_sec) - def wait_for_workload_pods_stable(self, workload_name, namespace="default"): - logging(f'Waiting for {namespace} workload {workload_name} pod stable') - wait_for_workload_pods_stable(workload_name, namespace=namespace) + def wait_for_workload_pods_running(self, workload_name, namespace="default"): + logging(f'Waiting for {namespace} workload {workload_name} pods running') + wait_for_workload_pods_running(workload_name, namespace=namespace) - def wait_for_workloads_pods_stable(self, workload_names, namespace="default"): - logging(f'Waiting for {namespace} workloads {workload_names} pods stable') + def wait_for_workloads_pods_running(self, workload_names, namespace="default"): + logging(f'Waiting for {namespace} workloads {workload_names} pods running') with multiprocessing.Pool(processes=len(workload_names)) as pool: - pool.starmap(wait_for_workload_pods_stable, [(name, namespace) for name in workload_names]) + pool.starmap(wait_for_workload_pods_running, [(name, namespace) for name in workload_names]) pool.join() + def wait_for_workload_pods_stable(self, workload_name, namespace="default"): + logging(f'Waiting for {namespace} workload {workload_name} pod stable') + wait_for_workload_pods_stable(workload_name, namespace=namespace) + def wait_for_workload_volume_healthy(self, workload_name): volume_name = get_workload_volume_name(workload_name) diff --git a/e2e/libs/node/__init__.py b/e2e/libs/node/__init__.py index 93b52da04d..6b60b2f5a4 100644 --- a/e2e/libs/node/__init__.py +++ b/e2e/libs/node/__init__.py @@ -1,3 +1,2 @@ from node.node import Node -from node.node_instance import NodeInstance from node.stress import Stress diff --git a/e2e/libs/node/constant.py b/e2e/libs/node/constant.py index 8c8eaecccb..b7dc738512 100644 --- a/e2e/libs/node/constant.py +++ b/e2e/libs/node/constant.py @@ -1,5 +1,3 @@ -NODE_REBOOT_DOWN_TIME_SECOND = 60 - NODE_STRESS_CPU_LOAD_PERCENTAGE = 100 NODE_STRESS_MEM_LOAD_PERCENTAGE = 100 NODE_STRESS_MEM_VM_WORKERS = 1 diff --git a/e2e/libs/node/node_instance.py b/e2e/libs/node/node_instance.py deleted file mode 100644 index c017aa162c..0000000000 --- a/e2e/libs/node/node_instance.py +++ /dev/null @@ -1,73 +0,0 @@ -import boto3 -import time -import yaml - -from node.constant import NODE_REBOOT_DOWN_TIME_SECOND - -from utility.utility import logging -from utility.utility import wait_for_cluster_ready - - -class NodeInstance: - - def __init__(self): - with open('/tmp/instance_mapping', 'r') as f: - self.mapping = yaml.safe_load(f) - self.aws_client = boto3.client('ec2') - - def reboot_all_nodes(self, shut_down_time_in_sec=NODE_REBOOT_DOWN_TIME_SECOND): - instance_ids = [value for value in self.mapping.values()] - - resp = self.aws_client.stop_instances(InstanceIds=instance_ids, Force=True) - assert resp['ResponseMetadata']['HTTPStatusCode'] == 200, f"Failed to stop instances {instance_ids} response: {resp}" - logging(f"Stopping instances {instance_ids}") - waiter = self.aws_client.get_waiter('instance_stopped') - waiter.wait(InstanceIds=instance_ids) - - logging(f"Wait for {shut_down_time_in_sec} seconds before starting instances") - time.sleep(shut_down_time_in_sec) - - resp = self.aws_client.start_instances(InstanceIds=instance_ids) - logging(f"Starting instances {instance_ids} response: {resp}") - waiter = self.aws_client.get_waiter('instance_running') - waiter.wait(InstanceIds=instance_ids) - - wait_for_cluster_ready() - - logging(f"Started instances") - - def reboot_node(self, reboot_node_name, shut_down_time_in_sec=NODE_REBOOT_DOWN_TIME_SECOND): - instance_ids = [self.mapping[reboot_node_name]] - - resp = self.aws_client.stop_instances(InstanceIds=instance_ids, Force=True) - assert resp['ResponseMetadata']['HTTPStatusCode'] == 200, f"Failed to stop instances {instance_ids} response: {resp}" - logging(f"Stopping instances {instance_ids}") - waiter = self.aws_client.get_waiter('instance_stopped') - waiter.wait(InstanceIds=instance_ids) - logging(f"Stopped instances") - - time.sleep(shut_down_time_in_sec) - - resp = self.aws_client.start_instances(InstanceIds=instance_ids) - logging(f"Starting instances {instance_ids} response: {resp}") - waiter = self.aws_client.get_waiter('instance_running') - waiter.wait(InstanceIds=instance_ids) - logging(f"Started instances") - - def reboot_all_worker_nodes(self, shut_down_time_in_sec=NODE_REBOOT_DOWN_TIME_SECOND): - instance_ids = [self.mapping[value] for value in self.list_node_names_by_role("worker")] - - resp = self.aws_client.stop_instances(InstanceIds=instance_ids, Force=True) - assert resp['ResponseMetadata']['HTTPStatusCode'] == 200, f"Failed to stop instances {instance_ids} response: {resp}" - logging(f"Stopping instances {instance_ids}") - waiter = self.aws_client.get_waiter('instance_stopped') - waiter.wait(InstanceIds=instance_ids) - logging(f"Stopped instances") - - time.sleep(shut_down_time_in_sec) - - resp = self.aws_client.start_instances(InstanceIds=instance_ids) - logging(f"Starting instances {instance_ids} response: {resp}") - waiter = self.aws_client.get_waiter('instance_running') - waiter.wait(InstanceIds=instance_ids) - logging(f"Started instances") diff --git a/e2e/libs/persistentvolumeclaim/crd.py b/e2e/libs/persistentvolumeclaim/crd.py index 47f6ea352c..13df4deb64 100644 --- a/e2e/libs/persistentvolumeclaim/crd.py +++ b/e2e/libs/persistentvolumeclaim/crd.py @@ -23,16 +23,25 @@ def list(self, claim_namespace="default", label_selector=None): ) def set_annotation(self, claim_name, annotation_key, annotation_value, claim_namespace="default"): - claim = self.get(claim_name, claim_namespace) + for _ in range(self.retry_count): + claim = self.get(claim_name, claim_namespace) - annotations = claim.metadata.annotations - annotations[annotation_key] = annotation_value - claim.metadata.annotations = annotations - self.core_v1_api.patch_namespaced_persistent_volume_claim( - name=claim_name, - namespace=claim_namespace, - body=claim - ) + annotations = claim.metadata.annotations + if annotations is None: + annotations = {} + + if annotations[annotation_key] == annotation_value: + return + + annotations[annotation_key] = annotation_value + claim.metadata.annotations = annotations + self.core_v1_api.patch_namespaced_persistent_volume_claim( + name=claim_name, + namespace=claim_namespace, + body=claim + ) + + assert False, f"Failed to set annotation {annotation_key} to {annotation_value} for PVC {claim_name}" def get_annotation_value(self, claim_name, annotation_key, claim_namespace="default"): claim = self.get(claim_name, claim_namespace) diff --git a/e2e/libs/volume/crd.py b/e2e/libs/volume/crd.py index 17558f9b0e..4ba9b11404 100644 --- a/e2e/libs/volume/crd.py +++ b/e2e/libs/volume/crd.py @@ -177,7 +177,7 @@ def set_annotation(self, volume_name, annotation_key, annotation_value): break except Exception as e: if e.status == 409: - logging(f"Conflict error: {e.body.get('message')}, retry ({i}) ...") + logging(f"Conflict error: {e.body}, retry ({i}) ...") else: raise e time.sleep(self.retry_interval) diff --git a/e2e/libs/workload/workload.py b/e2e/libs/workload/workload.py index f418a68f1d..9fe3768f61 100644 --- a/e2e/libs/workload/workload.py +++ b/e2e/libs/workload/workload.py @@ -130,6 +130,24 @@ def check_pod_data_checksum(expected_checksum, pod_name, file_name, data_directo f"Expected checksum = {expected_checksum}" +def wait_for_workload_pods_running(workload_name, namespace="default"): + retry_count, retry_interval = get_retry_count_and_interval() + for i in range(retry_count): + pods = get_workload_pods(workload_name, namespace=namespace) + if len(pods) > 0: + running_pods = [] + for pod in pods: + if pod.status.phase == "Running": + running_pods.append(pod.metadata.name) + if len(running_pods) == len(pods): + return + + logging(f"Waiting for {workload_name} pods {running_pods} running, retry ({i}) ...") + time.sleep(retry_interval) + + assert False, f"Timeout waiting for {workload_name} pods running" + + def wait_for_workload_pods_stable(workload_name, namespace="default"): stable_pods = {} wait_for_stable_retry = {} @@ -151,6 +169,10 @@ def wait_for_workload_pods_stable(workload_name, namespace="default"): wait_for_stable_pod = [] for pod in pods: + if pod.status.phase != "Running": + wait_for_stable_pod.append(pod.metadata.name) + continue + pod_name = pod.metadata.name if wait_for_stable_retry[pod_name] != WAIT_FOR_POD_STABLE_MAX_RETRY: wait_for_stable_pod.append(pod_name)