From 26966bd4ae003027a162321467fd7a6fafb51cac Mon Sep 17 00:00:00 2001 From: Yang Chiu Date: Tue, 26 Sep 2023 11:50:10 +0800 Subject: [PATCH] test: implement backupstore setup (1) add backupstore.py for robot test cases (2) only support s3 now, the subprocess parts in backupstore.py need to be refined to make nfs work Signed-off-by: Yang Chiu --- e2e/keywords/common.resource | 4 + e2e/libs/backupstore/__init__.py | 2 + e2e/libs/backupstore/base.py | 198 +++++++++++++++++++ e2e/libs/backupstore/minio.py | 196 ++++++++++++++++++ e2e/libs/backupstore/nfs.py | 125 ++++++++++++ e2e/libs/keywords/backupstore_keywords.py | 55 ++++++ e2e/libs/longhorn.py | 2 + e2e/libs/recurringjob/recurringjob.py | 5 +- e2e/libs/recurringjob/rest.py | 166 ++++++++-------- e2e/libs/utility/utility.py | 9 + e2e/requirements.txt | 1 + pipelines/utilities/run_longhorn_e2e_test.sh | 11 ++ 12 files changed, 690 insertions(+), 84 deletions(-) create mode 100644 e2e/libs/backupstore/__init__.py create mode 100644 e2e/libs/backupstore/base.py create mode 100644 e2e/libs/backupstore/minio.py create mode 100644 e2e/libs/backupstore/nfs.py create mode 100644 e2e/libs/keywords/backupstore_keywords.py diff --git a/e2e/keywords/common.resource b/e2e/keywords/common.resource index 4b722f9bd5..0320d51912 100644 --- a/e2e/keywords/common.resource +++ b/e2e/keywords/common.resource @@ -9,6 +9,8 @@ Library ../libs/keywords/statefulset_keywords.py Library ../libs/keywords/stress_keywords.py Library ../libs/keywords/volume_keywords.py Library ../libs/keywords/workload_keywords.py +Library ../libs/keywords/network_keywords.py +Library ../libs/keywords/backupstore_keywords.py *** Keywords *** Set test environment @@ -17,6 +19,7 @@ Set test environment init_storageclasses setup_control_plane_network_latency + set_backupstore Cleanup test resources cleanup_control_plane_network_latency @@ -28,3 +31,4 @@ Cleanup test resources cleanup_persistentvolumeclaims cleanup_volumes cleanup_storageclasses + cleanup_backupstore diff --git a/e2e/libs/backupstore/__init__.py b/e2e/libs/backupstore/__init__.py new file mode 100644 index 0000000000..774d9a3eb1 --- /dev/null +++ b/e2e/libs/backupstore/__init__.py @@ -0,0 +1,2 @@ +from backupstore.nfs import Nfs +from backupstore.minio import Minio diff --git a/e2e/libs/backupstore/base.py b/e2e/libs/backupstore/base.py new file mode 100644 index 0000000000..6be59b2ec0 --- /dev/null +++ b/e2e/libs/backupstore/base.py @@ -0,0 +1,198 @@ +from abc import ABC, abstractmethod +import os +import time +import hashlib + +SETTING_BACKUP_TARGET = "backup-target" +SETTING_BACKUP_TARGET_CREDENTIAL_SECRET = "backup-target-credential-secret" +SETTING_BACKUPSTORE_POLL_INTERVAL = "backupstore-poll-interval" + +BACKUPSTORE_BV_PREFIX = "/backupstore/volumes/" +BACKUPSTORE_LOCK_DURATION = 150 + +RETRY_COUNT = 300 +RETRY_INTERVAL = 1 + +class Base(ABC): + + def is_backupTarget_s3(self, s): + return s.startswith("s3://") + + def is_backupTarget_nfs(self, s): + return s.startswith("nfs://") + + def get_backupstore_url(self): + backupstore = os.environ['LONGHORN_BACKUPSTORES'] + backupstore = backupstore.replace(" ", "") + backupstores = backupstore.split(",") + assert len(backupstores) != 0 + return backupstores + + def get_backupstore_poll_interval(self): + poll_interval = os.environ['LONGHORN_BACKUPSTORE_POLL_INTERVAL'] + assert len(poll_interval) != 0 + return poll_interval + + @abstractmethod + def set_backupstore(self, client): + return NotImplemented + + def reset_backupstore_setting(self, client): + backup_target_setting = client.by_id_setting(SETTING_BACKUP_TARGET) + client.update(backup_target_setting, value="") + + backup_target_credential_setting = client.by_id_setting( + SETTING_BACKUP_TARGET_CREDENTIAL_SECRET) + client.update(backup_target_credential_setting, value="") + + backup_store_poll_interval = client.by_id_setting( + SETTING_BACKUPSTORE_POLL_INTERVAL) + client.update(backup_store_poll_interval, value="300") + + def set_backupstore_url(self, client, url): + backup_target_setting = client.by_id_setting(SETTING_BACKUP_TARGET) + backup_target_setting = client.update(backup_target_setting, + value=url) + assert backup_target_setting.value == url + + def set_backupstore_credential_secret(self, client, credential_secret): + backup_target_credential_setting = client.by_id_setting( + SETTING_BACKUP_TARGET_CREDENTIAL_SECRET) + backup_target_credential_setting = client.update( + backup_target_credential_setting, value=credential_secret) + assert backup_target_credential_setting.value == credential_secret + + def set_backupstore_poll_interval(self, client, poll_interval): + backup_store_poll_interval_setting = client.by_id_setting( + SETTING_BACKUPSTORE_POLL_INTERVAL) + backup_target_poll_interal_setting = client.update( + backup_store_poll_interval_setting, value=poll_interval) + assert backup_target_poll_interal_setting.value == poll_interval + + def backup_volume_path(self, volume_name): + volume_name_sha512 = \ + hashlib.sha512(volume_name.encode('utf-8')).hexdigest() + + volume_dir_level_1 = volume_name_sha512[0:2] + volume_dir_level_2 = volume_name_sha512[2:4] + + backupstore_bv_path = BACKUPSTORE_BV_PREFIX + \ + volume_dir_level_1 + "/" + \ + volume_dir_level_2 + "/" + \ + volume_name + + return backupstore_bv_path + + @abstractmethod + def get_backup_volume_prefix(self, client, volume_name): + return NotImplemented + + def get_backup_target(self, client): + backup_target_setting = client.by_id_setting(SETTING_BACKUP_TARGET) + return backup_target_setting.value + + def get_secret(self, client): + backup_target_credential_setting = client.by_id_setting( + SETTING_BACKUP_TARGET_CREDENTIAL_SECRET) + return backup_target_credential_setting.value + + @abstractmethod + def get_backup_cfg_file_path(self, client, volume_name, backup_name): + return NotImplemented + + @abstractmethod + def get_volume_cfg_file_path(self, client, volume_name): + return NotImplemented + + @abstractmethod + def get_backup_blocks_dir(self, client, volume_name): + return NotImplemented + + @abstractmethod + def create_file_in_backupstore(self): + return NotImplemented + + @abstractmethod + def write_backup_cfg_file(self, client, core_api, volume_name, backup_name, data): + return NotImplemented + + @abstractmethod + def delete_file_in_backupstore(self): + return NotImplemented + + @abstractmethod + def delete_backup_cfg_file(self): + return NotImplemented + + @abstractmethod + def delete_volume_cfg_file(self): + return NotImplemented + + @abstractmethod + def delete_random_backup_block(self): + return NotImplemented + + @abstractmethod + def count_backup_block_files(self): + return NotImplemented + + def wait_for_lock_expiration(self): + """ + waits 150 seconds which is the lock duration + TODO: once we have implemented the delete functions, + we can switch to removing the locks directly + """ + time.sleep(BACKUPSTORE_LOCK_DURATION) + + def delete_backup_volume(self, client, volume_name): + bv = client.by_id_backupVolume(volume_name) + client.delete(bv) + self.wait_for_backup_volume_delete(client, volume_name) + + def wait_for_backup_volume_delete(self, client, name): + for _ in range(RETRY_COUNT): + bvs = client.list_backupVolume() + found = False + for bv in bvs: + if bv.name == name: + found = True + break + if not found: + break + time.sleep(RETRY_INTERVAL) + assert not found + + def cleanup_backup_volumes(self, client): + backup_volumes = client.list_backup_volume() + + # we delete the whole backup volume, which skips block gc + for backup_volume in backup_volumes: + self.delete_backup_volume(client, backup_volume.name) + + backup_volumes = client.list_backup_volume() + assert backup_volumes.data == [] + + def cleanup_system_backups(self, client): + """ + Clean up all system backups + :param client: The Longhorn client to use in the request. + """ + + system_backups = client.list_system_backup() + for system_backup in system_backups: + # ignore the error when clean up + try: + client.delete(system_backup) + except Exception as e: + name = system_backup['name'] + print("\nException when cleanup system backup ", name) + print(e) + + ok = False + for _ in range(RETRY_COUNT): + system_backups = client.list_system_backup() + if len(system_backups) == 0: + ok = True + break + time.sleep(RETRY_INTERVAL) + assert ok \ No newline at end of file diff --git a/e2e/libs/backupstore/minio.py b/e2e/libs/backupstore/minio.py new file mode 100644 index 0000000000..660f3376d4 --- /dev/null +++ b/e2e/libs/backupstore/minio.py @@ -0,0 +1,196 @@ +from backupstore.base import Base +import os +import base64 +import json +import tempfile +from minio import Minio +from minio.error import ResponseError +from urllib.parse import urlparse + +class Minio(Base): + + def set_backupstore(self, client): + backupstores = self.get_backupstore_url() + poll_interval = self.get_backupstore_poll_interval() + for backupstore in backupstores: + if self.is_backupTarget_s3(backupstore): + backupsettings = backupstore.split("$") + self.set_backupstore_url(client, backupsettings[0]) + self.set_backupstore_credential_secret(client, backupsettings[1]) + self.set_backupstore_poll_interval(client, poll_interval) + break + + def get_api_client(self, client, core_api, minio_secret_name): + secret = core_api.read_namespaced_secret(name=minio_secret_name, + namespace='longhorn-system') + + base64_minio_access_key = secret.data['AWS_ACCESS_KEY_ID'] + base64_minio_secret_key = secret.data['AWS_SECRET_ACCESS_KEY'] + base64_minio_endpoint_url = secret.data['AWS_ENDPOINTS'] + base64_minio_cert = secret.data['AWS_CERT'] + + minio_access_key = \ + base64.b64decode(base64_minio_access_key).decode("utf-8") + minio_secret_key = \ + base64.b64decode(base64_minio_secret_key).decode("utf-8") + + minio_endpoint_url = \ + base64.b64decode(base64_minio_endpoint_url).decode("utf-8") + minio_endpoint_url = minio_endpoint_url.replace('https://', '') + + return Minio(minio_endpoint_url, + access_key=minio_access_key, + secret_key=minio_secret_key, + secure=True) + + def get_backupstore_bucket_name(self, client): + backupstore = self.backupstore_get_backup_target(client) + assert self.is_backupTarget_s3(backupstore) + bucket_name = urlparse(backupstore).netloc.split('@')[0] + return bucket_name + + def get_backupstore_path(self, client): + backupstore = self.backupstore_get_backup_target(client) + assert self.is_backupTarget_s3(backupstore) + backupstore_path = urlparse(backupstore).path.split('$')[0].strip("/") + return backupstore_path + + def get_backup_volume_prefix(self, client, volume_name): + backupstore_bv_path = self.backup_volume_path(volume_name) + backupstore_path = self.get_backupstore_path(client) + return backupstore_path + backupstore_bv_path + + def get_backup_cfg_file_path(self, client, volume_name, backup_name): + prefix = self.get_backup_volume_prefix(client, volume_name) + return prefix + "/backups/backup_" + backup_name + ".cfg" + + def get_volume_cfg_file_path(self, client, volume_name): + prefix = self.get_backup_volume_prefix(client, volume_name) + return prefix + "/volume.cfg" + + def get_backup_blocks_dir(self, client, volume_name): + prefix = self.get_backup_volume_prefix(client, volume_name) + return prefix + "/blocks" + + def create_file_in_backupstore(self, client, core_api, file_path, data={}): # NOQA + backup_target_credential_setting = client.by_id_setting( + SETTING_BACKUP_TARGET_CREDENTIAL_SECRET) + + secret_name = backup_target_credential_setting.value + + minio_api = self.get_api_client(client, core_api, secret_name) + bucket_name = self.get_backupstore_bucket_name(client) + + if len(data) == 0: + data = {"testkey": "test data from mino_create_file_in_backupstore()"} + + with tempfile.NamedTemporaryFile(delete_on_close=False) as fp: + json.dump(data, fp) + try: + with open(fp.name, mode='rb') as f: + temp_file_stat = os.stat(fp.name) + minio_api.put_object(bucket_name, + file_path, + f, + temp_file_stat.st_size) + except ResponseError as err: + print(err) + + def write_backup_cfg_file(self, client, core_api, volume_name, backup_name, backup_cfg_data): # NOQA + secret_name = self.get_secret(client) + assert secret_name != '' + + minio_api = self.get_api_client(client, core_api, secret_name) + bucket_name = self.get_backupstore_bucket_name(client) + minio_backup_cfg_file_path = self.get_backup_cfg_file_path(volume_name, + backup_name) + + with tempfile.NamedTemporaryFile(delete_on_close=False) as fp: + fp.write(str(backup_cfg_data)) + fp.close() + try: + with open(fp.name, mode='rb') as f: + tmp_bkp_cfg_file_stat = os.stat(fp.name) + minio_api.put_object(bucket_name, + minio_backup_cfg_file_path, + f, + tmp_bkp_cfg_file_stat.st_size) + except ResponseError as err: + print(err) + + def delete_file_in_backupstore(self, client, core_api, file_path): + backup_target_credential_setting = client.by_id_setting( + SETTING_BACKUP_TARGET_CREDENTIAL_SECRET) + + secret_name = backup_target_credential_setting.value + + minio_api = self.get_api_client(client, core_api, secret_name) + bucket_name = self.get_backupstore_bucket_name(client) + + try: + minio_api.remove_object(bucket_name, file_path) + except ResponseError as err: + print(err) + + def delete_backup_cfg_file(self, client, core_api, volume_name, backup_name): + secret_name = self.get_secret(client) + assert secret_name != '' + + minio_api = self.get_api_client(client, core_api, secret_name) + bucket_name = self.get_backupstore_bucket_name(client) + minio_backup_cfg_file_path = self.get_backup_cfg_file_path(volume_name, + backup_name) + + try: + minio_api.remove_object(bucket_name, minio_backup_cfg_file_path) + except ResponseError as err: + print(err) + + def delete_volume_cfg_file(self, client, core_api, volume_name): + secret_name = self.get_secret(client) + assert secret_name != '' + + minio_api = self.get_api_client(client, core_api, secret_name) + bucket_name = self.get_backupstore_bucket_name(client) + minio_volume_cfg_file_path = self.get_volume_cfg_file_path(volume_name) + + try: + minio_api.remove_object(bucket_name, minio_volume_cfg_file_path) + except ResponseError as err: + print(err) + + def delete_random_backup_block(self, client, core_api, volume_name): + secret_name = self.get_secret(client) + assert secret_name != '' + + minio_api = self.get_api_client(client, core_api, secret_name) + + bucket_name = self.get_backupstore_bucket_name(client) + backup_blocks_dir = self.get_backup_blocks_dir(volume_name) + + block_object_files = minio_api.list_objects(bucket_name, + prefix=backup_blocks_dir, + recursive=True) + + object_file = block_object_files.__next__().object_name + + try: + minio_api.remove_object(bucket_name, object_file) + except ResponseError as err: + print(err) + + def count_backup_block_files(self, client, core_api, volume_name): + secret_name = self.get_secret(client) + assert secret_name != '' + + minio_api = self.get_api_client(client, core_api, secret_name) + bucket_name = self.get_backupstore_bucket_name(client) + backup_blocks_dir = self.get_backup_blocks_dir(volume_name) + + block_object_files = minio_api.list_objects(bucket_name, + prefix=backup_blocks_dir, + recursive=True) + + block_object_files_list = list(block_object_files) + + return len(block_object_files_list) \ No newline at end of file diff --git a/e2e/libs/backupstore/nfs.py b/e2e/libs/backupstore/nfs.py new file mode 100644 index 0000000000..4654add562 --- /dev/null +++ b/e2e/libs/backupstore/nfs.py @@ -0,0 +1,125 @@ +from backupstore.base import Base +import os +import subprocess +from urllib.parse import urlparse + +class Nfs(Base): + + def set_backupstore(self, client): + backupstores = self.get_backupstore_url() + poll_interval = self.get_backupstore_poll_interval() + for backupstore in backupstores: + if self.is_backupTarget_nfs(backupstore): + self.set_backupstore_url(client, backupstore) + self.set_backupstore_credential_secret(client, "") + self.set_backupstore_poll_interval(client, poll_interval) + self.mount_nfs_backupstore(client) + break + + def mount_nfs_backupstore(self, client, mount_path="/mnt/nfs"): + cmd = ["mkdir", "-p", mount_path] + subprocess.check_output(cmd) + nfs_backuptarget = client.by_id_setting(SETTING_BACKUP_TARGET).value + nfs_url = urlparse(nfs_backuptarget).netloc + \ + urlparse(nfs_backuptarget).path + cmd = ["mount", "-t", "nfs", "-o", "nfsvers=4.2", nfs_url, mount_path] + subprocess.check_output(cmd) + + def umount_nfs_backupstore(self, client, mount_path="/mnt/nfs"): + cmd = ["umount", mount_path] + subprocess.check_output(cmd) + cmd = ["rmdir", mount_path] + subprocess.check_output(cmd) + + def get_nfs_mount_point(self, client): + nfs_backuptarget = client.by_id_setting(SETTING_BACKUP_TARGET).value + nfs_url = urlparse(nfs_backuptarget).netloc + \ + urlparse(nfs_backuptarget).path + + cmd = ["findmnt", "-t", "nfs4", "-n", "--output", "source,target"] + stdout = subprocess.run(cmd, capture_output=True).stdout + mount_info = stdout.decode().strip().split(" ") + + assert mount_info[0] == nfs_url + return mount_info[1] + + def get_backup_volume_prefix(self, client, volume_name): + mount_point = self.get_nfs_mount_point(client) + return mount_point + self.backup_volume_path(volume_name) + + def get_backup_cfg_file_path(self, client, volume_name, backup_name): + prefix = self.get_backup_volume_prefix(client, volume_name) + return prefix + "/backups/backup_" + backup_name + ".cfg" + + def get_volume_cfg_file_path(self, client, volume_name): + prefix = self.get_backup_volume_prefix(client, volume_name) + return prefix + "/volume.cfg" + + def get_backup_blocks_dir(self, client, volume_name): + prefix = self.get_backup_volume_prefix(client, volume_name) + return prefix + "/blocks" + + def create_file_in_backupstore(self, file_path, data={}): + with open(file_path, 'w') as cfg_file: + cfg_file.write(str(data)) + + def write_backup_cfg_file(self, client, volume_name, backup_name, data): + nfs_backup_cfg_file_path = self.get_backup_cfg_file_path(client, + volume_name, + backup_name) + with open(nfs_backup_cfg_file_path, 'w') as cfg_file: + cfg_file.write(str(data)) + + def delete_file_in_backupstore(self, file_path): + try: + os.remove(file_path) + except Exception as ex: + print("error while deleting file:", + file_path) + print(ex) + + def delete_backup_cfg_file(self, client, volume_name, backup_name): + nfs_backup_cfg_file_path = self.get_backup_cfg_file_path(client, + volume_name, + backup_name) + try: + os.remove(nfs_backup_cfg_file_path) + except Exception as ex: + print("error while deleting backup cfg file:", + nfs_backup_cfg_file_path) + print(ex) + + def delete_volume_cfg_file(self, client, volume_name): + nfs_volume_cfg_path = self.get_volume_cfg_file_path(client, volume_name) + try: + os.remove(nfs_volume_cfg_path) + except Exception as ex: + print("error while deleting backup cfg file:", nfs_volume_cfg_path) + print(ex) + + def delete_random_backup_block(self, client, volume_name): + backup_blocks_dir = self.get_backup_blocks_dir(client, volume_name) + cmd = ["find", backup_blocks_dir, "-type", "f"] + find_cmd = subprocess.Popen(cmd, stdout=subprocess.PIPE) + head_cmd = subprocess.check_output(["/usr/bin/head", "-1"], stdin=find_cmd.stdout) + backup_block_file_path = head_cmd.decode().strip() + + try: + os.remove(backup_block_file_path) + except Exception as ex: + print("error while deleting backup block file:", + backup_block_file_path) + print(ex) + + def count_backup_block_files(self, client, volume_name): + backup_blocks_dir = self.get_backup_blocks_dir(client, volume_name) + cmd = ["find", backup_blocks_dir, "-type", "f"] + find_cmd = subprocess.Popen(cmd, stdout=subprocess.PIPE) + wc_cmd = subprocess.check_output(["/usr/bin/wc", "-l"], stdin=find_cmd.stdout) + backup_blocks_count = int(wc_cmd.decode().strip()) + + return backup_blocks_count + + def cleanup_backup_volumes(self, client): + super().cleanup_backup_volumes(client) + self.umount_nfs_backupstore(client) diff --git a/e2e/libs/keywords/backupstore_keywords.py b/e2e/libs/keywords/backupstore_keywords.py new file mode 100644 index 0000000000..db27a4a0d4 --- /dev/null +++ b/e2e/libs/keywords/backupstore_keywords.py @@ -0,0 +1,55 @@ +from backupstore import Nfs, Minio +from utility.utility import get_longhorn_client, get_backupstores +from kubernetes import client + +class backupstore_keywords: + + def __init__(self): + backupstores = get_backupstores() + if backupstores[0] == "s3": + self.backupstore = Minio() + else: + self.backupstore = Nfs() + + def set_backupstore(self): + self.backupstore.set_backupstore(get_longhorn_client()) + + def cleanup_backupstore(self): + client = get_longhorn_client() + self.backupstore.cleanup_system_backups(client) + self.backupstore.cleanup_backup_volumes(client) + self.backupstore.reset_backupstore_setting(client) + + def create_dummy_in_progress_backup(self, volume_name): + client = get_longhorn_client() + core_api = client.CoreV1Api() + + dummy_backup_cfg_data = {"Name": "dummy_backup", + "VolumeName": volume_name, + "CreatedTime": ""} + + self.backupstore.write_backup_cfg_file(client, + core_api, + volume_name, + "backup-dummy", + dummy_backup_cfg_data) + + def delete_dummy_in_progress_backup(self, volume_name): + client = get_longhorn_client() + core_api = client.CoreV1Api() + self.backupstore.delete_backup_cfg_file(client, + core_api, + volume_name, + "backup-dummy") + + def corrupt_backup_cfg_file(self, volume_name, backup_name): + client = get_longhorn_client() + core_api = client.CoreV1Api() + + corrupt_backup_cfg_data = "{corrupt: definitely" + + self.backupstore.write_backup_cfg_file(client, + core_api, + volume_name, + backup_name, + corrupt_backup_cfg_data) \ No newline at end of file diff --git a/e2e/libs/longhorn.py b/e2e/libs/longhorn.py index aea50f2dd8..5afcfe98e3 100644 --- a/e2e/libs/longhorn.py +++ b/e2e/libs/longhorn.py @@ -270,6 +270,8 @@ def cb(_link_name=link_name, return result + if type(obj) == str and '/v1/' in obj: + obj = self._url.replace("/v1/schemas", "") + obj[obj.find("/v1/"):] return obj def object_pairs_hook(self, pairs): diff --git a/e2e/libs/recurringjob/recurringjob.py b/e2e/libs/recurringjob/recurringjob.py index 1029e20064..8c2101c592 100644 --- a/e2e/libs/recurringjob/recurringjob.py +++ b/e2e/libs/recurringjob/recurringjob.py @@ -15,11 +15,14 @@ def __init__(self): else: self.recurringjob = Rest() + # Pushing a backup and a purge once a minute may be a little aggressive + # Changed to every 2 minutes + # ref: https://github.com/longhorn/longhorn/issues/7854 def create(self, job_name, task="snapshot", groups=[], - cron="* * * * *", + cron="*/2 * * * *", retain=1, concurrency=1, labels={}): diff --git a/e2e/libs/recurringjob/rest.py b/e2e/libs/recurringjob/rest.py index ec35b8ab46..299242eb15 100644 --- a/e2e/libs/recurringjob/rest.py +++ b/e2e/libs/recurringjob/rest.py @@ -11,6 +11,7 @@ from utility.utility import filter_cr from utility.utility import get_longhorn_client from utility.utility import logging +from utility.utility import get_retry_count_and_interval class Rest(Base): @@ -18,6 +19,7 @@ class Rest(Base): def __init__(self): self.longhorn_client = get_longhorn_client() self.batch_v1_api = client.BatchV1Api() + self.retry_count, self.retry_interval = get_retry_count_and_interval() def create(self, name, task, groups, cron, retain, concurrency, labels): self.longhorn_client.create_recurring_job( @@ -45,139 +47,137 @@ def add_to_volume(self, job_name, volume_name): def _wait_for_volume_recurringjob_update(self, job_name, volume_name): updated = False - for _ in range(RETRY_COUNTS): + for _ in range(self.retry_count): jobs, _ = self.get_volume_recurringjobs_and_groups(volume_name) if job_name in jobs: updated = True break - time.sleep(RETRY_INTERVAL) + time.sleep(self.retry_interval) assert updated def _wait_for_volume_recurringjob_delete(self, job_name, volume_name): deleted = False - for _ in range(RETRY_COUNTS): + for _ in range(self.retry_count): jobs, _ = self.get_volume_recurringjobs_and_groups(volume_name) if job_name not in jobs: deleted = True break - time.sleep(RETRY_INTERVAL) + time.sleep(self.retry_interval) assert deleted def get_volume_recurringjobs_and_groups(self, volume_name): - volume = self.longhorn_client.by_id_volume(volume_name) - list = volume.recurringJobList() - jobs = [] - groups = [] - for item in list: - if item['isGroup']: - groups.append(item['name']) - else: - jobs.append(item['name']) - return jobs, groups + for _ in range(self.retry_count): + volume = None + try: + volume = self.longhorn_client.by_id_volume(volume_name) + list = volume.recurringJobList() + jobs = [] + groups = [] + for item in list: + if item['isGroup']: + groups.append(item['name']) + else: + jobs.append(item['name']) + return jobs, groups + except Exception as e: + logging(f"Getting volume {volume} recurringjob list error: {e}") + time.sleep(self.retry_interval) def _wait_for_cron_job_create(self, job_name): created = False - for _ in range(RETRY_COUNTS): + for _ in range(self.retry_count): job = self.batch_v1_api.list_namespaced_cron_job( 'longhorn-system', label_selector=f"recurring-job.longhorn.io={job_name}") if len(job.items) != 0: created = True break - time.sleep(RETRY_INTERVAL) + time.sleep(self.retry_interval) assert created def _wait_for_cron_job_delete(self, job_name): deleted = False - for _ in range(RETRY_COUNTS): + for _ in range(self.retry_count): job = self.batch_v1_api.list_namespaced_cron_job( 'longhorn-system', label_selector=f"recurring-job.longhorn.io={job_name}") if len(job.items) == 0: deleted = True break - time.sleep(RETRY_INTERVAL) + time.sleep(self.retry_interval) assert deleted def check_jobs_work(self, volume_name): - # check if snapshot/backup is really created by the - # recurringjob following the cron schedule - # currently only support checking normal snapshot/backup - # every 1 min recurringjob jobs, _ = self.get_volume_recurringjobs_and_groups(volume_name) for job_name in jobs: job = self.get(job_name) logging(f"Checking recurringjob {job}") - if job['task'] == 'snapshot' and job['cron'] == '* * * * *': - period_in_sec = 60 - self._check_snapshot_created_in_time(volume_name, job_name, period_in_sec) - elif job['task'] == 'backup' and job['cron'] == '* * * * *': - period_in_sec = 60 - self._check_backup_created_in_time(volume_name, period_in_sec) - - def _check_snapshot_created_in_time(self, volume_name, job_name, period_in_sec): + if job['task'] == 'snapshot': + self._check_snapshot_created(volume_name, job_name) + elif job['task'] == 'backup': + self._check_backup_created(volume_name) + + def _check_snapshot_created(self, volume_name, job_name): # check snapshot can be created by the recurringjob current_time = datetime.utcnow() current_timestamp = current_time.timestamp() - + logging(f"Recorded current time = {current_time}, timestamp = {current_timestamp}") label_selector=f"longhornvolume={volume_name}" - - max_iterations = period_in_sec * 10 - for _ in range(max_iterations): - time.sleep(1) - + snapshot_timestamp = 0 + for i in range(self.retry_count): + logging(f"Waiting for {volume_name} new snapshot created ({i}) ...") snapshot_list = filter_cr("longhorn.io", "v1beta2", "longhorn-system", "snapshots", label_selector=label_selector) - - if snapshot_list['items'] is None: - continue - - for item in snapshot_list['items']: - # this snapshot can be created by snapshot or backup recurringjob - # but job_name is in spec.labels.RecurringJob - # and crd doesn't support field selector - # so need to filter by ourselves - if item['spec']['labels'] is None: - continue - - try: - assert item['spec']['labels']['RecurringJob'] == job_name - except AssertionError: - continue - - snapshot_timestamp = datetime.strptime(snapshot_list['items'][0]['metadata']['creationTimestamp'], '%Y-%m-%dT%H:%M:%SZ').timestamp() - - if snapshot_timestamp > current_timestamp: - return - - logging(f"Snapshot {item['metadata']['name']} timestamp = {snapshot_timestamp} is not greater than {current_timestamp}") - - assert False, f"No new snapshot created by recurringjob {job_name} for {volume_name} since {current_time}" - - def _check_backup_created_in_time(self, volume_name, period_in_sec): + try: + if len(snapshot_list['items']) > 0: + for item in snapshot_list['items']: + # this snapshot can be created by snapshot or backup recurringjob + # but job_name is in spec.labels.RecurringJob + # and crd doesn't support field selector + # so need to filter by ourselves + if 'RecurringJob' in item['status']['labels'] and \ + item['status']['labels']['RecurringJob'] == job_name and \ + item['status']['readyToUse'] == True: + snapshot_time = item['metadata']['creationTimestamp'] + snapshot_time = datetime.strptime(snapshot_time, '%Y-%m-%dT%H:%M:%SZ') + snapshot_timestamp = snapshot_time.timestamp() + if snapshot_timestamp > current_timestamp: + logging(f"Got snapshot {item}, create time = {snapshot_time}, timestamp = {snapshot_timestamp}") + return + except Exception as e: + logging(f"Iterating snapshot list error: {e}") + time.sleep(self.retry_interval) + assert False, f"since {current_time},\ + there's no new snapshot created by recurringjob \ + {snapshot_list}" + + def _check_backup_created(self, volume_name): # check backup can be created by the recurringjob current_time = datetime.utcnow() current_timestamp = current_time.timestamp() - + logging(f"Recorded current time = {current_time}, timestamp = {current_timestamp}") label_selector=f"backup-volume={volume_name}" - - max_iterations = period_in_sec * 10 - for _ in range(max_iterations): - time.sleep(1) - + backup_timestamp = 0 + for i in range(self.retry_count): + logging(f"Waiting for {volume_name} new backup created ({i}) ...") backup_list = filter_cr("longhorn.io", "v1beta2", "longhorn-system", "backups", label_selector=label_selector) - - if backup_list['items'] is None: - continue - - for item in backup_list['items']: - backup_timestamp = datetime.strptime(item['metadata']['creationTimestamp'], '%Y-%m-%dT%H:%M:%SZ').timestamp() - - if backup_timestamp > current_timestamp: - return - - logging(f"Backup {item['metadata']['name']} timestamp = {backup_timestamp} is not greater than {current_timestamp}") - - assert False, f"No new backup created by recurringjob for {volume_name} since {current_time}" + try: + if len(backup_list['items']) > 0: + for item in backup_list['items']: + state = item['status']['state'] + if state != "Completed": + continue + backup_time = item['metadata']['creationTimestamp'] + backup_time = datetime.strptime(backup_time, '%Y-%m-%dT%H:%M:%SZ') + backup_timestamp = backup_time.timestamp() + if backup_timestamp > current_timestamp: + logging(f"Got backup {item}, create time = {backup_time}, timestamp = {backup_timestamp}") + return + except Exception as e: + logging(f"Iterating backup list error: {e}") + time.sleep(self.retry_interval) + assert False, f"since {current_time},\ + there's no new backup created by recurringjob \ + {backup_list}" def cleanup(self, volume_names): for volume_name in volume_names: @@ -185,4 +185,4 @@ def cleanup(self, volume_names): jobs, _ = self.get_volume_recurringjobs_and_groups(volume_name) for job in jobs: logging(f"Deleting recurringjob {job}") - self.delete(job, volume_name) + self.delete(job, volume_name) \ No newline at end of file diff --git a/e2e/libs/utility/utility.py b/e2e/libs/utility/utility.py index 4a9746c7a5..1af3210b83 100644 --- a/e2e/libs/utility/utility.py +++ b/e2e/libs/utility/utility.py @@ -187,3 +187,12 @@ def get_name_suffix(*args): if arg: suffix += f"-{arg}" return suffix + + +def get_backupstores(): + backupstore = os.environ['LONGHORN_BACKUPSTORES'] + backupstore = backupstore.replace(" ", "") + backupstores = backupstore.split(",") + for i in range(len(backupstores)): + backupstores[i] = backupstores[i].split(":")[0] + return backupstores diff --git a/e2e/requirements.txt b/e2e/requirements.txt index 1322cc411d..15b78eca42 100644 --- a/e2e/requirements.txt +++ b/e2e/requirements.txt @@ -6,3 +6,4 @@ kubernetes==27.2.0 requests==2.31.0 boto3==1.34.55 pyyaml==6.0.1 +minio==5.0.10 \ No newline at end of file diff --git a/pipelines/utilities/run_longhorn_e2e_test.sh b/pipelines/utilities/run_longhorn_e2e_test.sh index e79f5012c9..9c0da7a84f 100755 --- a/pipelines/utilities/run_longhorn_e2e_test.sh +++ b/pipelines/utilities/run_longhorn_e2e_test.sh @@ -66,6 +66,13 @@ run_longhorn_e2e_test(){ run_longhorn_e2e_test_out_of_cluster(){ + if [[ ${BACKUP_STORE_TYPE} == "s3" ]]; then + LONGHORN_BACKUPSTORES='s3://backupbucket@us-east-1/backupstore$minio-secret' + elif [[ $BACKUP_STORE_TYPE = "nfs" ]]; then + LONGHORN_BACKUPSTORES='nfs://longhorn-test-nfs-svc.default:/opt/backupstore' + fi + LONGHORN_BACKUPSTORE_POLL_INTERVAL="30" + eval "ROBOT_COMMAND_ARGS=($PYTEST_CUSTOM_OPTIONS)" if [[ -n ${LONGHORN_TESTS_CUSTOM_IMAGE} ]]; then @@ -74,6 +81,8 @@ run_longhorn_e2e_test_out_of_cluster(){ CONTAINER_NAME="e2e-container-${IMAGE_NAME}" docker run --pull=always \ --name "${CONTAINER_NAME}" \ + -e LONGHORN_BACKUPSTORES="${LONGHORN_BACKUPSTORES}" \ + -e LONGHORN_BACKUPSTORE_POLL_INTERVAL="${LONGHORN_BACKUPSTORE_POLL_INTERVAL}" \ -e AWS_ACCESS_KEY_ID="${TF_VAR_lh_aws_access_key}" \ -e AWS_SECRET_ACCESS_KEY="${TF_VAR_lh_aws_secret_key}" \ -e AWS_DEFAULT_REGION="${TF_VAR_aws_region}" \ @@ -84,6 +93,8 @@ run_longhorn_e2e_test_out_of_cluster(){ docker stop "${CONTAINER_NAME}" docker rm "${CONTAINER_NAME}" else + export LONGHORN_BACKUPSTORES=${LONGHORN_BACKUPSTORES} + export LONGHORN_BACKUPSTORE_POLL_INTERVAL=${LONGHORN_BACKUPSTORE_POLL_INTERVAL} cd e2e python3 -m venv . source bin/activate