Skip to content

Commit

Permalink
Merge branch 'master' into force-drain
Browse files Browse the repository at this point in the history
Signed-off-by: yangchiu <[email protected]>
  • Loading branch information
yangchiu authored May 3, 2024
2 parents ee4b0a8 + 482e755 commit c89c464
Show file tree
Hide file tree
Showing 13 changed files with 255 additions and 90 deletions.
2 changes: 2 additions & 0 deletions e2e/keywords/common.resource
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Library ../libs/keywords/persistentvolumeclaim_keywords.py
Library ../libs/keywords/network_keywords.py
Library ../libs/keywords/backupstore_keywords.py
Library ../libs/keywords/storageclass_keywords.py
Library ../libs/keywords/node_keywords.py

*** Keywords ***
Set test environment
Expand All @@ -34,3 +35,4 @@ Cleanup test resources
cleanup_volumes
cleanup_storageclasses
cleanup_backupstore
cleanup_disks
12 changes: 12 additions & 0 deletions e2e/keywords/node.resource
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
*** Settings ***
Documentation Node Keywords
Library ../libs/keywords/common_keywords.py
Library ../libs/keywords/node_keywords.py

*** Keywords ***
Add ${disk_type} type disk ${disk_path} for all worker nodes
${worker_nodes}= get_worker_nodes
FOR ${worker_node} IN @{worker_nodes}
add_disk ${worker_node} ${disk_type} ${disk_path}
END
6 changes: 3 additions & 3 deletions e2e/libs/keywords/host_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ def reboot_node_by_index(self, idx, power_off_time_in_min=1):
node_name = self.node.get_node_by_index(idx)
reboot_down_time_sec = int(power_off_time_in_min) * 60

logging(f'Rebooting node {node_name} with downtime {power_off_time_in_min} minutes')
logging(f'Rebooting node {node_name} with downtime {reboot_down_time_sec} seconds')
self.host.reboot_node(node_name, reboot_down_time_sec)

def reboot_all_worker_nodes(self, power_off_time_in_min=1):
reboot_down_time_sec = int(power_off_time_in_min) * 60

logging(f'Rebooting all worker nodes with downtime {power_off_time_in_min} minutes')
logging(f'Rebooting all worker nodes with downtime {reboot_down_time_sec} seconds')
self.host.reboot_all_worker_nodes(reboot_down_time_sec)

def reboot_all_nodes(self):
Expand All @@ -48,5 +48,5 @@ def reboot_all_nodes(self):
def reboot_node_by_name(self, node_name, downtime_in_min=1):
reboot_down_time_sec = int(downtime_in_min) * 60

logging(f'Rebooting node {node_name} with downtime {downtime_in_min} minutes')
logging(f'Rebooting node {node_name} with downtime {reboot_down_time_sec} seconds')
self.host.reboot_node(node_name, reboot_down_time_sec)
24 changes: 24 additions & 0 deletions e2e/libs/keywords/node_keywords.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from node import Node
from utility.utility import logging

class node_keywords:

def __init__(self):
self.node = Node()

def add_disk(self, node_name, type, path):
logging(f"Adding {type} type disk {path} to node {node_name}")
disk = {
f"{type}-disk": {
"diskType": type,
"path": path,
"allowScheduling": True
}
}
self.node.add_disk(node_name, disk)

def cleanup_disks(self):
nodes = self.node.list_node_names_by_role("worker")
for node_name in nodes:
logging(f"Resetting node {node_name} disks to default")
self.node.reset_disks(node_name)
89 changes: 28 additions & 61 deletions e2e/libs/keywords/volume_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ def cleanup_volumes(self):
for volume in volumes['items']:
self.delete_volume(volume['metadata']['name'])

def create_volume(self, volume_name, size, replica_count, frontend="blockdev", migratable=False, access_mode="RWO"):
def create_volume(self, volume_name, size, replica_count, frontend="blockdev", migratable=False, access_mode="RWO", data_engine="v1"):
logging(f'Creating volume {volume_name}')
self.volume.create(volume_name, size, replica_count, frontend, migratable, access_mode)
self.volume.create(volume_name, size, replica_count, frontend, migratable, access_mode, data_engine)

def delete_volume(self, volume_name):
logging(f'Deleting volume {volume_name}')
Expand All @@ -52,13 +52,6 @@ def wait_for_volume_expand_to_size(self, volume_name, size):
logging(f'Waiting for volume {volume_name} expand to {size}')
return self.volume.wait_for_volume_expand_to_size(volume_name, size)

def get_replica_node_ids(self, volume_name):
node_ids = []
node_ids.extend(self.get_node_ids_by_replica_locality(volume_name, "volume node"))
node_ids.extend(self.get_node_ids_by_replica_locality(volume_name, "replica node"))
node_ids.extend(self.get_node_ids_by_replica_locality(volume_name, "test pod node"))
return node_ids

def get_replica_node(self, volume_name):
return self.get_node_id_by_replica_locality(volume_name, "replica node")

Expand All @@ -78,14 +71,9 @@ def get_node_ids_by_replica_locality(self, volume_name, replica_locality):
worker_nodes = self.node.list_node_names_by_role("worker")
volume_node = self.get_node_id_by_replica_locality(volume_name, "volume node")
replica_nodes = [node for node in worker_nodes if node != volume_node]
test_pod_node = self.node.get_test_pod_running_node()

if replica_locality == "test pod node":
if test_pod_node in replica_nodes:
return [test_pod_node]

elif replica_locality == "replica node":
return [node for node in replica_nodes if node != test_pod_node]
if replica_locality == "replica node":
return replica_nodes

else:
raise ValueError(f"Unknown replica locality {replica_locality}")
Expand All @@ -108,64 +96,43 @@ def check_data_checksum(self, volume_name):
logging(f"Checking volume {volume_name} data checksum is {checksum}")
self.volume.check_data_checksum(volume_name, checksum)

def delete_replica(self, volume_name, replica_node):
if str(replica_node).isdigit():
replica_node = self.node.get_node_by_index(replica_node)

logging(f"Deleting volume {volume_name}'s replica on node {replica_node}")
self.volume.delete_replica(volume_name, replica_node)

def delete_replica_on_node(self, volume_name, replica_locality):
check_replica_locality(replica_locality)

node_id = self.get_node_id_by_replica_locality(volume_name, replica_locality)
node_name = None
if index := self.node.is_accessing_node_by_index(replica_locality):
node_name = self.node.get_node_by_index(index)
else:
node_name = self.get_node_id_by_replica_locality(volume_name, replica_locality)

logging(f"Deleting volume {volume_name}'s replica on node {node_id}")
self.volume.delete_replica(volume_name, node_id)
logging(f"Deleting volume {volume_name}'s replica on node {node_name}")
self.volume.delete_replica(volume_name, node_name)

def set_annotation(self, volume_name, annotation_key, annotation_value):
self.volume.set_annotation(volume_name, annotation_key, annotation_value)

async def wait_for_replica_rebuilding_start(self, volume_name, replica_node):
if str(replica_node).isdigit():
replica_node = self.node.get_node_by_index(replica_node)

logging(f"Waiting for volume {volume_name}'s replica on node {replica_node} rebuilding started")
await self.volume.wait_for_replica_rebuilding_start(
volume_name,
replica_node
)

async def wait_for_replica_rebuilding_to_start_on_node(self, volume_name, replica_locality):
check_replica_locality(replica_locality)

node_id = self.get_node_id_by_replica_locality(volume_name, replica_locality)

logging(f"Waiting for volume {volume_name}'s replica on node {node_id} rebuilding started")
await self.volume.wait_for_replica_rebuilding_start(volume_name, node_id)

def wait_for_replica_rebuilding_complete(self, volume_name, replica_node):
if str(replica_node).isdigit():
replica_node = self.node.get_node_by_index(replica_node)
node_name = None
if index := self.node.is_accessing_node_by_index(replica_locality):
node_name = self.node.get_node_by_index(index)
else:
node_name = self.get_node_id_by_replica_locality(volume_name, replica_locality)

logging(f"Waiting for volume {volume_name}'s replica on node {replica_node} rebuilding completed")
self.volume.wait_for_replica_rebuilding_complete(
volume_name,
replica_node
)
logging(f"Waiting for volume {volume_name}'s replica on node {node_name} rebuilding started")
await self.volume.wait_for_replica_rebuilding_start(volume_name, node_name)

def wait_for_replica_rebuilding_to_complete_on_node(self, volume_name, replica_locality):
check_replica_locality(replica_locality)

node_id = self.get_node_id_by_replica_locality(volume_name, replica_locality)
node_name = None
if index := self.node.is_accessing_node_by_index(replica_locality):
node_name = self.node.get_node_by_index(index)
else:
node_name = self.get_node_id_by_replica_locality(volume_name, replica_locality)

logging(f"Waiting for volume {volume_name}'s replica on node {node_id} rebuilding completed")
self.volume.wait_for_replica_rebuilding_complete(volume_name, node_id)
logging(f"Waiting for volume {volume_name}'s replica on node {node_name} rebuilding completed")
self.volume.wait_for_replica_rebuilding_complete(volume_name, node_name)

def wait_for_replica_rebuilding_to_complete(self, volume_name):
for node_id in self.get_replica_node_ids(volume_name):
logging(f"Waiting for volume {volume_name}'s replica on node {node_id} rebuilding completed")
self.volume.wait_for_replica_rebuilding_complete(volume_name, node_id)
for node_name in self.node.list_node_names_by_role("worker"):
logging(f"Waiting for volume {volume_name}'s replica on node {node_name} rebuilding completed")
self.volume.wait_for_replica_rebuilding_complete(volume_name, node_name)

async def only_one_replica_rebuilding_will_start_at_a_time_on_node(self, volume_name_0, volume_name_1, replica_locality):

Expand Down
89 changes: 73 additions & 16 deletions e2e/libs/node/node.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,87 @@
import os
import time

import re
from kubernetes import client

from robot.libraries.BuiltIn import BuiltIn

from utility.utility import get_longhorn_client
from utility.utility import get_retry_count_and_interval
from utility.utility import logging


class Node:

DEFAULT_DISK_PATH = "/var/lib/longhorn/"

def __init__(self):
pass
self.longhorn_client = get_longhorn_client()
self.retry_count, self.retry_interval = get_retry_count_and_interval()

def update_disks(self, node_name, disks):
node = self.longhorn_client.by_id_node(node_name)
for _ in range(self.retry_count):
try:
node.diskUpdate(disks=disks)
break
except Exception as e:
logging(f"Updating node {node_name} disk error: {e}")
time.sleep(self.retry_interval)

def add_disk(self, node_name, disk):
node = self.longhorn_client.by_id_node(node_name)
disks = node.disks
disks.update(disk)
self.update_disks(node_name, disks)

def reset_disks(self, node_name):
node = self.longhorn_client.by_id_node(node_name)

for disk_name, disk in iter(node.disks.items()):
if disk.path != self.DEFAULT_DISK_PATH:
disk.allowScheduling = False
self.update_disks(node_name, node.disks)

disks = {}
for disk_name, disk in iter(node.disks.items()):
if disk.path == self.DEFAULT_DISK_PATH:
disks[disk_name] = disk
else:
logging(f"Try to remove disk {disk_name} from node {node_name}")
self.update_disks(node_name, disks)

def get_all_pods_on_node(self, node_name):
api = client.CoreV1Api()
all_pods = api.list_namespaced_pod(namespace='longhorn-system', field_selector='spec.nodeName=' + node_name)
user_pods = [p for p in all_pods.items if (p.metadata.namespace != 'kube-system')]
return user_pods

def wait_all_pods_evicted(self, node_name):
retry_count, retry_interval = get_retry_count_and_interval()
for _ in range(retry_count):
pods = self.get_all_pods_on_node(node_name)
evicted = True
for pod in pods:
# check non DaemonSet Pods are evicted or terminating (deletionTimestamp != None)
pod_type = pod.metadata.owner_references[0].kind
pod_delete_timestamp = pod.metadata.deletion_timestamp

if pod_type != 'DaemonSet' and pod_delete_timestamp == None:
evicted = False
break

if evicted:
break

time.sleep(retry_interval)

assert evicted, 'failed to evict pods'

def is_accessing_node_by_index(self, node):
p = re.compile('node (\d)')
if m := p.match(node):
return m.group(1)
else:
return None

def get_node_by_index(self, index, role="worker"):
nodes = self.list_node_names_by_role(role)
Expand All @@ -21,19 +91,6 @@ def get_node_by_name(self, node_name):
core_api = client.CoreV1Api()
return core_api.read_node(node_name)

def get_test_pod_running_node(self):
if "NODE_NAME" in os.environ:
return os.environ["NODE_NAME"]
else:
return self.get_node_by_index(0)

def get_test_pod_not_running_node(self):
worker_nodes = self.list_node_names_by_role("worker")
test_pod_running_node = self.get_test_pod_running_node()
for worker_node in worker_nodes:
if worker_node != test_pod_running_node:
return worker_node

def get_node_cpu_cores(self, node_name):
node = self.get_node_by_name(node_name)
return node.status.capacity['cpu']
Expand Down
2 changes: 1 addition & 1 deletion e2e/libs/node/utility.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
def check_replica_locality(replica_locality):
assert replica_locality in ["replica node", "test pod node", "volume node"], f"Unknown replica locality: {replica_locality}: "
assert replica_locality in ["replica node", "volume node"], f"Unknown replica locality: {replica_locality}: "
5 changes: 3 additions & 2 deletions e2e/libs/volume/crd.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self, node_exec):
self.retry_count, self.retry_interval = get_retry_count_and_interval()
self.engine = Engine()

def create(self, volume_name, size, replica_count, frontend, migratable, access_mode):
def create(self, volume_name, size, replica_count, frontend, migratable, access_mode, data_engine):
size = str(int(size) * GIBIBYTE)
access_mode = access_mode.lower()
body = {
Expand All @@ -39,7 +39,8 @@ def create(self, volume_name, size, replica_count, frontend, migratable, access_
"size": size,
"numberOfReplicas": int(replica_count),
"migratable": migratable,
"accessMode": access_mode
"accessMode": access_mode,
"dataEngine": data_engine
}
}
try:
Expand Down
4 changes: 2 additions & 2 deletions e2e/libs/volume/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ def __init__(self):
else:
self.volume = Rest(node_exec)

def create(self, volume_name, size, replica_count, frontend, migratable, access_mode):
return self.volume.create(volume_name, size, replica_count, frontend, migratable, access_mode)
def create(self, volume_name, size, replica_count, frontend, migratable, access_mode, data_engine):
return self.volume.create(volume_name, size, replica_count, frontend, migratable, access_mode, data_engine)

def delete(self, volume_name):
return self.volume.delete(volume_name)
Expand Down
11 changes: 6 additions & 5 deletions e2e/tests/replica_rebuilding.robot
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ ${RETRY_INTERVAL} 1
Delete Replica While Replica Rebuilding
Given Create volume 0 with 2 GB and 3 replicas
And Attach volume 0
And Wait for volume 0 healthy
And Write data to volume 0

FOR ${i} IN RANGE ${LOOP_COUNT}
When Delete volume 0 replica on volume node
And Wait until volume 0 replica rebuilding started on volume node
And Delete volume 0 replica on replica node
And Wait until volume 0 replica rebuilding completed on volume node
And Delete volume 0 replica on test pod node
When Delete volume 0 replica on node 0
And Wait until volume 0 replica rebuilding started on node 0
And Delete volume 0 replica on node 1
And Wait until volume 0 replica rebuilding completed on node 0
And Delete volume 0 replica on node 2

Then Check volume 0 data is intact
And Wait until volume 0 replicas rebuilding completed
Expand Down
Loading

0 comments on commit c89c464

Please sign in to comment.