diff --git a/e2e/keywords/workload.resource b/e2e/keywords/workload.resource index 80452fd292..d84b810df6 100644 --- a/e2e/keywords/workload.resource +++ b/e2e/keywords/workload.resource @@ -2,11 +2,12 @@ Documentation Workload Keywords Library Collections +Library String Library ../libs/keywords/common_keywords.py Library ../libs/keywords/volume_keywords.py Library ../libs/keywords/workload_keywords.py Library ../libs/keywords/host_keywords.py -Library ../libs/keywords/kubelet_keywords.py +Library ../libs/keywords/k8s_keywords.py *** Keywords *** Keep writing data to pod of ${workload_kind} ${workload_id} @@ -23,12 +24,24 @@ Reboot volume node of ${workload_kind} ${workload_id} ${volume_name} = get_workload_volume_name ${workload_name} reboot_volume_node ${volume_name} -When Stop volume node kubelet of ${workload_kind} ${workload_id} for ${duration} seconds +Stop volume node kubelet of ${workload_kind} ${workload_id} for ${duration} seconds ${workload_name} = generate_name_with_suffix ${workload_kind} ${workload_id} ${volume_name} = get_workload_volume_name ${workload_name} ${node_name} = get_volume_node ${volume_name} restart_kubelet ${node_name} ${duration} +Stop volume nodes kubelet for ${duration} seconds + [Arguments] @{args} + @{node_list} = Create List + FOR ${arg} IN @{args} + @{workload} = Split String ${arg} + ${workload_name} = generate_name_with_suffix ${workload}[0] ${workload}[1] + ${volume_name} = get_workload_volume_name ${workload_name} + ${node_name} = get_volume_node ${volume_name} + Append To List ${node_list} ${node_name} + END + restart_kubelet_on_nodes ${duration} ${node_list} + Wait for volume of ${workload_kind} ${workload_id} healthy ${workload_name} = generate_name_with_suffix ${workload_kind} ${workload_id} wait_for_workload_volume_healthy ${workload_name} @@ -57,3 +70,13 @@ Delete replica of ${workload_kind} ${workload_id} volume on ${replica_locality} ${workload_name} = generate_name_with_suffix ${workload_kind} ${workload_id} ${volume_name} = get_workload_volume_name ${workload_name} delete_replica_on_node ${volume_name} ${replica_locality} + +Wait for workloads pods stable + [Arguments] @{args} + @{workload_list} = Create List + FOR ${arg} IN @{args} + @{workload} = Split String ${arg} + ${workload_name} = generate_name_with_suffix ${workload}[0] ${workload}[1] + Append To List ${workload_list} ${workload_name} + END + wait_for_workloads_pods_stably_running ${workload_list} diff --git a/e2e/libs/k8s/k8s.py b/e2e/libs/k8s/k8s.py index 44753552a6..f8e3100029 100644 --- a/e2e/libs/k8s/k8s.py +++ b/e2e/libs/k8s/k8s.py @@ -1,5 +1,6 @@ import time import subprocess +import asyncio from workload.pod import create_pod from workload.pod import delete_pod from workload.pod import new_pod_manifest @@ -7,7 +8,7 @@ from utility.utility import logging -def restart_kubelet(node_name, downtime_in_sec=10): +async def restart_kubelet(node_name, downtime_in_sec=10): manifest = new_pod_manifest( image=IMAGE_UBUNTU, command=["/bin/bash"], @@ -17,7 +18,7 @@ def restart_kubelet(node_name, downtime_in_sec=10): pod_name = manifest['metadata']['name'] create_pod(manifest, is_wait_for_pod_running=True) - time.sleep(downtime_in_sec) + await asyncio.sleep(downtime_in_sec) delete_pod(pod_name) diff --git a/e2e/libs/keywords/k8s_keywords.py b/e2e/libs/keywords/k8s_keywords.py index a4c92fbb91..38fb4db2ef 100644 --- a/e2e/libs/keywords/k8s_keywords.py +++ b/e2e/libs/keywords/k8s_keywords.py @@ -1,12 +1,30 @@ +import asyncio from robot.libraries.BuiltIn import BuiltIn from k8s.k8s import restart_kubelet from k8s.k8s import delete_node +from utility.utility import logging class k8s_keywords: - def restart_kubelet(self, node_name, stop_time_in_sec): - restart_kubelet(node_name, int(stop_time_in_sec)) + async def restart_kubelet(self, node_name, downtime_in_sec): + logging(f'Restarting kubelet on node {node_name} with downtime {downtime_in_sec} seconds') + await restart_kubelet(node_name, int(downtime_in_sec)) + + async def restart_kubelet_on_nodes(self, downtime_in_sec, node_list): + logging(f'Restarting kubelet on nodes {node_list} with downtime {downtime_in_sec} seconds') + + async def restart_kubelet_tasks(): + tasks = [] + for node_name in node_list: + tasks.append( + asyncio.create_task(restart_kubelet(node_name, int(downtime_in_sec)), name=node_name) + ) + + done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED) + logging(f"All kubelets on nodes {node_list} are restarted after downtime {downtime_in_sec} seconds") + + await restart_kubelet_tasks() def delete_volume_node(self, volume_name): volume_keywords = BuiltIn().get_library_instance('volume_keywords') diff --git a/e2e/libs/keywords/statefulset_keywords.py b/e2e/libs/keywords/statefulset_keywords.py index 4ffba42270..28db25e89f 100644 --- a/e2e/libs/keywords/statefulset_keywords.py +++ b/e2e/libs/keywords/statefulset_keywords.py @@ -14,6 +14,7 @@ from workload.statefulset import list_statefulsets from workload.statefulset import scale_statefulset from workload.statefulset import wait_for_statefulset_replicas_ready +from workload.workload import get_workload_volume_name @@ -49,15 +50,15 @@ def scale_statefulset_down(self, statefulset_name): logging(f'Scaling statefulset {statefulset_name} down') scale_statefulset(statefulset_name, 0) - workload_keywords = BuiltIn().get_library_instance('workload_keywords') - workload_keywords.wait_for_workload_volume_detached(statefulset_name) + volume_name = get_workload_volume_name(statefulset_name) + self.volume.wait_for_volume_detached(volume_name) def scale_statefulset_up(self, statefulset_name, replicaset_count=3): logging(f'Scaling statefulset {statefulset_name} up to {replicaset_count}') scale_statefulset(statefulset_name, replicaset_count) - workload_keywords = BuiltIn().get_library_instance('workload_keywords') - workload_keywords.wait_for_workload_volume_healthy(statefulset_name) + volume_name = get_workload_volume_name(statefulset_name) + self.volume.wait_for_volume_healthy(volume_name) self.wait_for_statefulset_replicas_ready(statefulset_name, replicaset_count) diff --git a/e2e/libs/keywords/workload_keywords.py b/e2e/libs/keywords/workload_keywords.py index 0e49e877f1..c5c4d11144 100644 --- a/e2e/libs/keywords/workload_keywords.py +++ b/e2e/libs/keywords/workload_keywords.py @@ -1,8 +1,5 @@ import multiprocessing - -from deployment_keywords import deployment_keywords -from statefulset_keywords import statefulset_keywords -from volume_keywords import volume_keywords +import asyncio from persistentvolumeclaim import PersistentVolumeClaim @@ -29,10 +26,6 @@ class workload_keywords: def __init__(self): - self.deployment_keywords = deployment_keywords() - self.statefulset_keywords = statefulset_keywords() - self.volume_keywords = volume_keywords() - self.persistentvolumeclaim = PersistentVolumeClaim() self.volume = Volume() @@ -63,13 +56,13 @@ def write_workload_pod_random_data(self, workload_name, size_in_mb, file_name): logging(f'Writing {size_in_mb} MB random data to pod {pod_name}') checksum = write_pod_random_data(pod_name, size_in_mb, file_name) - volume = get_volume_name_by_pod(pod_name) - self.volume_keywords.set_annotation(volume, ANNOT_CHECKSUM, checksum) + volume_name = get_volume_name_by_pod(pod_name) + self.volume.set_annotation(volume_name, ANNOT_CHECKSUM, checksum) def check_workload_pod_data_checksum(self, workload_name, file_name): pod_name = get_workload_pod_names(workload_name)[0] - volume = get_volume_name_by_pod(pod_name) - expected_checksum = self.volume.get_annotation_value(volume, ANNOT_CHECKSUM) + volume_name = get_volume_name_by_pod(pod_name) + expected_checksum = self.volume.get_annotation_value(volume_name, ANNOT_CHECKSUM) logging(f'Checking checksum for file {file_name} in pod {pod_name}') check_pod_data_checksum(expected_checksum, pod_name, file_name) @@ -91,15 +84,30 @@ def wait_for_workloads_pods_running(self, workload_names, namespace="default"): pool.join() - def wait_for_workload_pods_stable(self, workload_name, namespace="default"): + async def wait_for_workloads_pods_stably_running(self, workloads): + logging(f'Waiting for workloads {workloads} pods stable') + + async def wait_for_workloads_tasks(): + tasks = [] + for workload_name in workloads: + tasks.append( + asyncio.create_task(wait_for_workload_pods_stable(workload_name, namespace="default"), name=workload_name) + ) + + done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED) + logging(f"All workloads {workloads} pods are stably running now") + + await wait_for_workloads_tasks() + + async def wait_for_workload_pods_stable(self, workload_name, namespace="default"): logging(f'Waiting for {namespace} workload {workload_name} pod stable') - wait_for_workload_pods_stable(workload_name, namespace=namespace) + await wait_for_workload_pods_stable(workload_name, namespace=namespace) def wait_for_workload_volume_healthy(self, workload_name): volume_name = get_workload_volume_name(workload_name) logging(f'Waiting for workload {workload_name} volume {volume_name} to be healthy') - self.volume_keywords.wait_for_volume_healthy(volume_name) + self.volume.wait_for_volume_healthy(volume_name) def wait_for_workload_volume_detached(self, workload_name): volume_name = get_workload_volume_name(workload_name) diff --git a/e2e/libs/workload/workload.py b/e2e/libs/workload/workload.py index 8cdca817ef..32393f601a 100644 --- a/e2e/libs/workload/workload.py +++ b/e2e/libs/workload/workload.py @@ -1,6 +1,6 @@ import time import yaml - +import asyncio from kubernetes import client from kubernetes.client.rest import ApiException from kubernetes.stream import stream @@ -168,7 +168,7 @@ def wait_for_workload_pods_running(workload_name, namespace="default"): assert False, f"Timeout waiting for {workload_name} pods running" -def wait_for_workload_pods_stable(workload_name, namespace="default"): +async def wait_for_workload_pods_stable(workload_name, namespace="default"): stable_pods = {} wait_for_stable_retry = {} wait_for_stable_pod = [] @@ -201,6 +201,6 @@ def wait_for_workload_pods_stable(workload_name, namespace="default"): return logging(f"Waiting for {workload_name} pods {wait_for_stable_pod} stable, retry ({i}) ...") - time.sleep(retry_interval) + await asyncio.sleep(retry_interval) assert False, f"Timeout waiting for {workload_name} pods {wait_for_stable_pod} stable)" diff --git a/e2e/tests/kubelet_restart.robot b/e2e/tests/kubelet_restart.robot index 1aa137ddec..0707ec8b22 100644 --- a/e2e/tests/kubelet_restart.robot +++ b/e2e/tests/kubelet_restart.robot @@ -18,26 +18,34 @@ ${RETRY_INTERVAL} 1 *** Test Cases *** Restart Volume Node Kubelet While Workload Heavy Writing Given Create statefulset 0 using RWO volume + And Create statefulset 1 using RWX volume FOR ${i} IN RANGE ${LOOP_COUNT} And Keep writing data to pod of statefulset 0 + And Keep writing data to pod of statefulset 1 - When Stop volume node kubelet of statefulset 0 for 10 seconds + When Stop volume nodes kubelet for 10 seconds statefulset 0 statefulset 1 And Wait for volume of statefulset 0 healthy - And Wait for statefulset 0 pods stable + And Wait for volume of statefulset 1 healthy + And Wait for workloads pods stable statefulset 0 statefulset 1 Then Check statefulset 0 works + And Check statefulset 1 works END Stop Volume Node Kubelet For More Than Pod Eviction Timeout While Workload Heavy Writing Given Create statefulset 0 using RWO volume + And Create statefulset 1 using RWX volume FOR ${i} IN RANGE ${LOOP_COUNT} And Keep writing data to pod of statefulset 0 + And Keep writing data to pod of statefulset 1 - When Stop volume node kubelet of statefulset 0 for 360 seconds + When Stop volume nodes kubelet for 360 seconds statefulset 0 statefulset 1 And Wait for volume of statefulset 0 healthy - And Wait for statefulset 0 pods stable + And Wait for volume of statefulset 1 healthy + And Wait for workloads pods stable statefulset 0 statefulset 1 Then Check statefulset 0 works + And Check statefulset 1 works END