From 1d42c1cc1dd5c4e6d664b14539d934376894fcf9 Mon Sep 17 00:00:00 2001
From: yangchiu <yang.chiu@suse.com>
Date: Mon, 15 Apr 2024 09:26:19 +0800
Subject: [PATCH] test: implement backupstore setup (#1539)

(1) add backupstore.py for robot test cases
(2) only support s3 now, the subprocess parts in
    backupstore.py need to be refined to make
    nfs work

Signed-off-by: Yang Chiu <yang.chiu@suse.com>
---
 e2e/keywords/common.resource                 |   4 +
 e2e/libs/backupstore/__init__.py             |   2 +
 e2e/libs/backupstore/base.py                 | 142 +++++++++++++++
 e2e/libs/backupstore/minio.py                | 180 +++++++++++++++++++
 e2e/libs/backupstore/nfs.py                  | 114 ++++++++++++
 e2e/libs/keywords/backupstore_keywords.py    |  58 ++++++
 e2e/libs/longhorn.py                         |   2 +
 e2e/libs/recurringjob/recurringjob.py        |   5 +-
 e2e/libs/recurringjob/rest.py                | 162 +++++++++--------
 e2e/libs/setting/__init__.py                 |   1 +
 e2e/libs/setting/constant.py                 |   3 +
 e2e/libs/setting/setting.py                  |  73 ++++++++
 e2e/requirements.txt                         |   1 +
 pipelines/utilities/run_longhorn_e2e_test.sh |  11 ++
 14 files changed, 675 insertions(+), 83 deletions(-)
 create mode 100644 e2e/libs/backupstore/__init__.py
 create mode 100644 e2e/libs/backupstore/base.py
 create mode 100644 e2e/libs/backupstore/minio.py
 create mode 100644 e2e/libs/backupstore/nfs.py
 create mode 100644 e2e/libs/keywords/backupstore_keywords.py
 create mode 100644 e2e/libs/setting/__init__.py
 create mode 100644 e2e/libs/setting/constant.py
 create mode 100644 e2e/libs/setting/setting.py

diff --git a/e2e/keywords/common.resource b/e2e/keywords/common.resource
index 40c37db2e9..85c1cdf7ca 100644
--- a/e2e/keywords/common.resource
+++ b/e2e/keywords/common.resource
@@ -10,6 +10,8 @@ Library             ../libs/keywords/stress_keywords.py
 Library             ../libs/keywords/volume_keywords.py
 Library             ../libs/keywords/workload_keywords.py
 Library             ../libs/keywords/persistentvolumeclaim_keywords.py
+Library             ../libs/keywords/network_keywords.py
+Library             ../libs/keywords/backupstore_keywords.py
 
 *** Keywords ***
 Set test environment
@@ -18,6 +20,7 @@ Set test environment
     init_storageclasses
 
     setup_control_plane_network_latency
+    set_backupstore
 
 Cleanup test resources
     cleanup_control_plane_network_latency
@@ -29,3 +32,4 @@ Cleanup test resources
     cleanup_persistentvolumeclaims
     cleanup_volumes
     cleanup_storageclasses
+    cleanup_backupstore
diff --git a/e2e/libs/backupstore/__init__.py b/e2e/libs/backupstore/__init__.py
new file mode 100644
index 0000000000..774d9a3eb1
--- /dev/null
+++ b/e2e/libs/backupstore/__init__.py
@@ -0,0 +1,2 @@
+from backupstore.nfs import Nfs
+from backupstore.minio import Minio
diff --git a/e2e/libs/backupstore/base.py b/e2e/libs/backupstore/base.py
new file mode 100644
index 0000000000..0fea0a4ce5
--- /dev/null
+++ b/e2e/libs/backupstore/base.py
@@ -0,0 +1,142 @@
+from abc import ABC, abstractmethod
+import time
+import os
+import hashlib
+from utility.utility import get_retry_count_and_interval
+from setting import Setting
+
+class Base(ABC):
+
+    def is_backupTarget_s3(self, s):
+        return s.startswith("s3://")
+
+    def is_backupTarget_nfs(self, s):
+        return s.startswith("nfs://")
+
+    @classmethod
+    def get_backupstores(cls):
+        backupstore = os.environ['LONGHORN_BACKUPSTORES']
+        backupstore = backupstore.replace(" ", "")
+        backupstores = backupstore.split(",")
+        for i in range(len(backupstores)):
+            backupstores[i] = backupstores[i].split(":")[0]
+        return backupstores
+
+    def backup_volume_path(self, volume_name):
+        volume_name_sha512 = \
+            hashlib.sha512(volume_name.encode('utf-8')).hexdigest()
+
+        volume_dir_level_1 = volume_name_sha512[0:2]
+        volume_dir_level_2 = volume_name_sha512[2:4]
+
+        backupstore_bv_path = "/backupstore/volumes/" + \
+            volume_dir_level_1 + "/" + \
+            volume_dir_level_2 + "/" + \
+            volume_name
+
+        return backupstore_bv_path
+
+    @abstractmethod
+    def get_backup_volume_prefix(self, client, volume_name):
+        return NotImplemented
+
+    def get_backup_target(self):
+        return Setting().get_backup_target()
+
+    def get_secret(self):
+        return Setting().get_secret()
+
+    @abstractmethod
+    def get_backup_cfg_file_path(self, client, volume_name, backup_name):
+        return NotImplemented
+
+    @abstractmethod
+    def get_volume_cfg_file_path(self, client, volume_name):
+        return NotImplemented
+
+    @abstractmethod
+    def get_backup_blocks_dir(self, client, volume_name):
+        return NotImplemented
+
+    @abstractmethod
+    def create_file_in_backupstore(self):
+        return NotImplemented
+
+    @abstractmethod
+    def write_backup_cfg_file(self, client, core_api, volume_name, backup_name, data):
+        return NotImplemented
+
+    @abstractmethod
+    def delete_file_in_backupstore(self):
+        return NotImplemented
+
+    @abstractmethod
+    def delete_backup_cfg_file(self):
+        return NotImplemented
+
+    @abstractmethod
+    def delete_volume_cfg_file(self):
+        return NotImplemented
+
+    @abstractmethod
+    def delete_random_backup_block(self):
+        return NotImplemented
+
+    @abstractmethod
+    def count_backup_block_files(self):
+        return NotImplemented
+
+    def delete_backup_volume(self, client, volume_name):
+        bv = client.by_id_backupVolume(volume_name)
+        client.delete(bv)
+        self.wait_for_backup_volume_delete(client, volume_name)
+
+    def wait_for_backup_volume_delete(self, client, name):
+        retry_count, retry_interval = get_retry_count_and_interval()
+        for _ in range(retry_count):
+            bvs = client.list_backupVolume()
+            found = False
+            for bv in bvs:
+                if bv.name == name:
+                    found = True
+                    break
+            if not found:
+                break
+            time.sleep(retry_interval)
+        assert not found
+
+    def cleanup_backup_volumes(self, client):
+        backup_volumes = client.list_backup_volume()
+
+        # we delete the whole backup volume, which skips block gc
+        for backup_volume in backup_volumes:
+            self.delete_backup_volume(client, backup_volume.name)
+
+        backup_volumes = client.list_backup_volume()
+        assert backup_volumes.data == []
+
+    def cleanup_system_backups(self, client):
+        """
+        Clean up all system backups
+        :param client: The Longhorn client to use in the request.
+        """
+
+        system_backups = client.list_system_backup()
+        for system_backup in system_backups:
+            # ignore the error when clean up
+            try:
+                client.delete(system_backup)
+            except Exception as e:
+                name = system_backup['name']
+                print("\nException when cleanup system backup ", name)
+                print(e)
+
+        ok = False
+        retry_count, retry_interval = get_retry_count_and_interval()
+        for _ in range(retry_count):
+            system_backups = client.list_system_backup()
+            if len(system_backups) == 0:
+                ok = True
+                break
+            time.sleep(retry_interval)
+        assert ok
diff --git a/e2e/libs/backupstore/minio.py b/e2e/libs/backupstore/minio.py
new file mode 100644
index 0000000000..15bf61bd22
--- /dev/null
+++ b/e2e/libs/backupstore/minio.py
@@ -0,0 +1,180 @@
+from backupstore.base import Base
+import os
+import base64
+import json
+import tempfile
+from minio import Minio
+from minio.error import ResponseError
+from urllib.parse import urlparse
+
+class Minio(Base):
+
+    def get_api_client(self, client, core_api, minio_secret_name):
+        secret = core_api.read_namespaced_secret(name=minio_secret_name,
+                                                 namespace='longhorn-system')
+
+        base64_minio_access_key = secret.data['AWS_ACCESS_KEY_ID']
+        base64_minio_secret_key = secret.data['AWS_SECRET_ACCESS_KEY']
+        base64_minio_endpoint_url = secret.data['AWS_ENDPOINTS']
+
+        minio_access_key = \
+            base64.b64decode(base64_minio_access_key).decode("utf-8")
+        minio_secret_key = \
+            base64.b64decode(base64_minio_secret_key).decode("utf-8")
+
+        minio_endpoint_url = \
+            base64.b64decode(base64_minio_endpoint_url).decode("utf-8")
+        minio_endpoint_url = minio_endpoint_url.replace('https://', '')
+
+        return Minio(minio_endpoint_url,
+                     access_key=minio_access_key,
+                     secret_key=minio_secret_key,
+                     secure=True)
+
+    def get_backupstore_bucket_name(self, client):
+        backupstore = self.get_backup_target()
+        assert self.is_backupTarget_s3(backupstore)
+        bucket_name = urlparse(backupstore).netloc.split('@')[0]
+        return bucket_name
+
+    def get_backupstore_path(self, client):
+        backupstore = self.get_backup_target()
+        assert self.is_backupTarget_s3(backupstore)
+        backupstore_path = urlparse(backupstore).path.split('$')[0].strip("/")
+        return backupstore_path
+
+    def get_backup_volume_prefix(self, client, volume_name):
+        backupstore_bv_path = self.backup_volume_path(volume_name)
+        backupstore_path = self.get_backupstore_path(client)
+        return backupstore_path + backupstore_bv_path
+
+    def get_backup_cfg_file_path(self, client, volume_name, backup_name):
+        prefix = self.get_backup_volume_prefix(client, volume_name)
+        return prefix + "/backups/backup_" + backup_name + ".cfg"
+
+    def get_volume_cfg_file_path(self, client, volume_name):
+        prefix = self.get_backup_volume_prefix(client, volume_name)
+        return prefix + "/volume.cfg"
+
+    def get_backup_blocks_dir(self, client, volume_name):
+        prefix = self.get_backup_volume_prefix(client, volume_name)
+        return prefix + "/blocks"
+
+    def create_file_in_backupstore(self, client, core_api, file_path, data={}): # NOQA
+
+        secret_name = self.get_secret()
+
+        minio_api = self.get_api_client(client, core_api, secret_name)
+        bucket_name = self.get_backupstore_bucket_name(client)
+
+        if len(data) == 0:
+            data = {"testkey": "test data from mino_create_file_in_backupstore()"}
+
+        with tempfile.NamedTemporaryFile(delete_on_close=False) as fp:
+            json.dump(data, fp)
+            try:
+                with open(fp.name, mode='rb') as f:
+                    temp_file_stat = os.stat(fp.name)
+                    minio_api.put_object(bucket_name,
+                                         file_path,
+                                         f,
+                                         temp_file_stat.st_size)
+            except ResponseError as err:
+                print(err)
+
+    def write_backup_cfg_file(self, client, core_api, volume_name, backup_name, backup_cfg_data): # NOQA
+        secret_name = self.get_secret()
+        assert secret_name != ''
+
+        minio_api = self.get_api_client(client, core_api, secret_name)
+        bucket_name = self.get_backupstore_bucket_name(client)
+        minio_backup_cfg_file_path = self.get_backup_cfg_file_path(volume_name,
+                                                                   backup_name)
+
+        with tempfile.NamedTemporaryFile(delete_on_close=False) as fp:
+            fp.write(str(backup_cfg_data))
+            fp.close()
+            try:
+                with open(fp.name, mode='rb') as f:
+                    tmp_bkp_cfg_file_stat = os.stat(fp.name)
+                    minio_api.put_object(bucket_name,
+                                         minio_backup_cfg_file_path,
+                                         f,
+                                         tmp_bkp_cfg_file_stat.st_size)
+            except ResponseError as err:
+                print(err)
+
+    def delete_file_in_backupstore(self, client, core_api, file_path):
+
+        secret_name = self.get_secret()
+
+        minio_api = self.get_api_client(client, core_api, secret_name)
+        bucket_name = self.get_backupstore_bucket_name(client)
+
+        try:
+            minio_api.remove_object(bucket_name, file_path)
+        except ResponseError as err:
+            print(err)
+
+    def delete_backup_cfg_file(self, client, core_api, volume_name, backup_name):
+        secret_name = self.get_secret()
+        assert secret_name != ''
+
+        minio_api = self.get_api_client(client, core_api, secret_name)
+        bucket_name = self.get_backupstore_bucket_name(client)
+        minio_backup_cfg_file_path = self.get_backup_cfg_file_path(volume_name,
+                                                                   backup_name)
+
+        try:
+            minio_api.remove_object(bucket_name, minio_backup_cfg_file_path)
+        except ResponseError as err:
+            print(err)
+
+    def delete_volume_cfg_file(self, client, core_api, volume_name):
+        secret_name = self.get_secret()
+        assert secret_name != ''
+
+        minio_api = self.get_api_client(client, core_api, secret_name)
+        bucket_name = self.get_backupstore_bucket_name(client)
+        minio_volume_cfg_file_path = self.get_volume_cfg_file_path(volume_name)
+
+        try:
+            minio_api.remove_object(bucket_name, minio_volume_cfg_file_path)
+        except ResponseError as err:
+            print(err)
+
+    def delete_random_backup_block(self, client, core_api, volume_name):
+        secret_name = self.get_secret()
+        assert secret_name != ''
+
+        minio_api = self.get_api_client(client, core_api, secret_name)
+
+        bucket_name = self.get_backupstore_bucket_name(client)
+        backup_blocks_dir = self.get_backup_blocks_dir(volume_name)
+
+        block_object_files = minio_api.list_objects(bucket_name,
+                                                    prefix=backup_blocks_dir,
+                                                    recursive=True)
+
+        object_file = block_object_files.__next__().object_name
+
+        try:
+            minio_api.remove_object(bucket_name, object_file)
+        except ResponseError as err:
+            print(err)
+
+    def count_backup_block_files(self, client, core_api, volume_name):
+        secret_name = self.get_secret()
+        assert secret_name != ''
+
+        minio_api = self.get_api_client(client, core_api, secret_name)
+        bucket_name = self.get_backupstore_bucket_name(client)
+        backup_blocks_dir = self.get_backup_blocks_dir(volume_name)
+
+        block_object_files = minio_api.list_objects(bucket_name,
+                                                    prefix=backup_blocks_dir,
+                                                    recursive=True)
+
+        block_object_files_list = list(block_object_files)
+
+        return len(block_object_files_list)
diff --git a/e2e/libs/backupstore/nfs.py b/e2e/libs/backupstore/nfs.py
new file mode 100644
index 0000000000..162dbde7fc
--- /dev/null
+++ b/e2e/libs/backupstore/nfs.py
@@ -0,0 +1,114 @@
+from backupstore.base import Base
+import os
+import subprocess
+from urllib.parse import urlparse
+
+class Nfs(Base):
+
+    def mount_nfs_backupstore(self, client, mount_path="/mnt/nfs"):
+        cmd = ["mkdir", "-p", mount_path]
+        subprocess.check_output(cmd)
+        nfs_backuptarget = self.get_backup_target()
+        nfs_url = urlparse(nfs_backuptarget).netloc + \
+            urlparse(nfs_backuptarget).path
+        cmd = ["mount", "-t", "nfs", "-o", "nfsvers=4.2", nfs_url, mount_path]
+        subprocess.check_output(cmd)
+
+    def umount_nfs_backupstore(self, client, mount_path="/mnt/nfs"):
+        cmd = ["umount", mount_path]
+        subprocess.check_output(cmd)
+        cmd = ["rmdir", mount_path]
+        subprocess.check_output(cmd)
+
+    def get_nfs_mount_point(self, client):
+        nfs_backuptarget = self.get_backup_target()
+        nfs_url = urlparse(nfs_backuptarget).netloc + \
+            urlparse(nfs_backuptarget).path
+
+        cmd = ["findmnt", "-t", "nfs4", "-n", "--output", "source,target"]
+        stdout = subprocess.run(cmd, capture_output=True).stdout
+        mount_info = stdout.decode().strip().split(" ")
+
+        assert mount_info[0] == nfs_url
+        return mount_info[1]
+
+    def get_backup_volume_prefix(self, client, volume_name):
+        mount_point = self.get_nfs_mount_point(client)
+        return mount_point + self.backup_volume_path(volume_name)
+
+    def get_backup_cfg_file_path(self, client, volume_name, backup_name):
+        prefix = self.get_backup_volume_prefix(client, volume_name)
+        return prefix + "/backups/backup_" + backup_name + ".cfg"
+
+    def get_volume_cfg_file_path(self, client, volume_name):
+        prefix = self.get_backup_volume_prefix(client, volume_name)
+        return prefix + "/volume.cfg"
+
+    def get_backup_blocks_dir(self, client, volume_name):
+        prefix = self.get_backup_volume_prefix(client, volume_name)
+        return prefix + "/blocks"
+
+    def create_file_in_backupstore(self, file_path, data={}):
+        with open(file_path, 'w') as cfg_file:
+            cfg_file.write(str(data))
+
+    def write_backup_cfg_file(self, client, volume_name, backup_name, data):
+        nfs_backup_cfg_file_path = self.get_backup_cfg_file_path(client,
+                                                                 volume_name,
+                                                                 backup_name)
+        with open(nfs_backup_cfg_file_path, 'w') as cfg_file:
+            cfg_file.write(str(data))
+
+    def delete_file_in_backupstore(self, file_path):
+        try:
+            os.remove(file_path)
+        except Exception as ex:
+            print("error while deleting file:",
+                  file_path)
+            print(ex)
+
+    def delete_backup_cfg_file(self, client, volume_name, backup_name):
+        nfs_backup_cfg_file_path = self.get_backup_cfg_file_path(client,
+                                                                 volume_name,
+                                                                 backup_name)
+        try:
+            os.remove(nfs_backup_cfg_file_path)
+        except Exception as ex:
+            print("error while deleting backup cfg file:",
+                  nfs_backup_cfg_file_path)
+            print(ex)
+
+    def delete_volume_cfg_file(self, client, volume_name):
+        nfs_volume_cfg_path = self.get_volume_cfg_file_path(client, volume_name)
+        try:
+            os.remove(nfs_volume_cfg_path)
+        except Exception as ex:
+            print("error while deleting backup cfg file:", nfs_volume_cfg_path)
+            print(ex)
+
+    def delete_random_backup_block(self, client, volume_name):
+        backup_blocks_dir = self.get_backup_blocks_dir(client, volume_name)
+        cmd = ["find", backup_blocks_dir, "-type", "f"]
+        find_cmd = subprocess.Popen(cmd, stdout=subprocess.PIPE)
+        head_cmd = subprocess.check_output(["/usr/bin/head", "-1"], stdin=find_cmd.stdout)
+        backup_block_file_path = head_cmd.decode().strip()
+
+        try:
+            os.remove(backup_block_file_path)
+        except Exception as ex:
+            print("error while deleting backup block file:",
+                  backup_block_file_path)
+            print(ex)
+
+    def count_backup_block_files(self, client, volume_name):
+        backup_blocks_dir = self.get_backup_blocks_dir(client, volume_name)
+        cmd = ["find", backup_blocks_dir, "-type", "f"]
+        find_cmd = subprocess.Popen(cmd, stdout=subprocess.PIPE)
+        wc_cmd = subprocess.check_output(["/usr/bin/wc", "-l"], stdin=find_cmd.stdout)
+        backup_blocks_count = int(wc_cmd.decode().strip())
+
+        return backup_blocks_count
+
+    def cleanup_backup_volumes(self, client):
+        super().cleanup_backup_volumes(client)
+        self.umount_nfs_backupstore(client)
diff --git a/e2e/libs/keywords/backupstore_keywords.py b/e2e/libs/keywords/backupstore_keywords.py
new file mode 100644
index 0000000000..98e847dd49
--- /dev/null
+++ b/e2e/libs/keywords/backupstore_keywords.py
@@ -0,0 +1,58 @@
+from backupstore import Nfs, Minio
+from utility.utility import get_longhorn_client
+from kubernetes import client
+from setting import Setting
+
+
+class backupstore_keywords:
+
+    def __init__(self):
+        backupstores = Minio.get_backupstores()
+        if backupstores[0] == "s3":
+            self.backupstore = Minio()
+        else:
+            self.backupstore = Nfs()
+        self.setting = Setting()
+
+    def set_backupstore(self):
+        self.setting.set_backupstore()
+
+    def cleanup_backupstore(self):
+        client = get_longhorn_client()
+        self.backupstore.cleanup_system_backups(client)
+        self.backupstore.cleanup_backup_volumes(client)
+        self.setting.reset_backupstore_setting()
+
+    def create_dummy_in_progress_backup(self, volume_name):
+        client = get_longhorn_client()
+        core_api = client.CoreV1Api()
+
+        dummy_backup_cfg_data = {"Name": "dummy_backup",
+                                 "VolumeName": volume_name,
+                                 "CreatedTime": ""}
+
+        self.backupstore.write_backup_cfg_file(client,
+                                               core_api,
+                                               volume_name,
+                                               "backup-dummy",
+                                               dummy_backup_cfg_data)
+
+    def delete_dummy_in_progress_backup(self, volume_name):
+        client = get_longhorn_client()
+        core_api = client.CoreV1Api()
+        self.backupstore.delete_backup_cfg_file(client,
+                                                core_api,
+                                                volume_name,
+                                                "backup-dummy")
+
+    def corrupt_backup_cfg_file(self, volume_name, backup_name):
+        client = get_longhorn_client()
+        core_api = client.CoreV1Api()
+
+        corrupt_backup_cfg_data = "{corrupt: definitely"
+
+        self.backupstore.write_backup_cfg_file(client,
+                                               core_api,
+                                               volume_name,
+                                               backup_name,
+                                               corrupt_backup_cfg_data)
diff --git a/e2e/libs/longhorn.py b/e2e/libs/longhorn.py
index aea50f2dd8..5afcfe98e3 100644
--- a/e2e/libs/longhorn.py
+++ b/e2e/libs/longhorn.py
@@ -270,6 +270,8 @@ def cb(_link_name=link_name,
 
             return result
 
+        if type(obj) == str and '/v1/' in obj:
+            obj = self._url.replace("/v1/schemas", "") + obj[obj.find("/v1/"):]
         return obj
 
     def object_pairs_hook(self, pairs):
diff --git a/e2e/libs/recurringjob/recurringjob.py b/e2e/libs/recurringjob/recurringjob.py
index 1029e20064..8c2101c592 100644
--- a/e2e/libs/recurringjob/recurringjob.py
+++ b/e2e/libs/recurringjob/recurringjob.py
@@ -15,11 +15,14 @@ def __init__(self):
         else:
             self.recurringjob = Rest()
 
+    # Pushing a backup and a purge once a minute may be a little aggressive
+    # Changed to every 2 minutes
+    # ref: https://github.com/longhorn/longhorn/issues/7854
     def create(self,
                job_name,
                task="snapshot",
                groups=[],
-               cron="* * * * *",
+               cron="*/2 * * * *",
                retain=1,
                concurrency=1,
                labels={}):
diff --git a/e2e/libs/recurringjob/rest.py b/e2e/libs/recurringjob/rest.py
index ec35b8ab46..1a42767f3b 100644
--- a/e2e/libs/recurringjob/rest.py
+++ b/e2e/libs/recurringjob/rest.py
@@ -11,6 +11,7 @@
 from utility.utility import filter_cr
 from utility.utility import get_longhorn_client
 from utility.utility import logging
+from utility.utility import get_retry_count_and_interval
 
 
 class Rest(Base):
@@ -18,6 +19,7 @@ class Rest(Base):
     def __init__(self):
         self.longhorn_client = get_longhorn_client()
         self.batch_v1_api = client.BatchV1Api()
+        self.retry_count, self.retry_interval = get_retry_count_and_interval()
 
     def create(self, name, task, groups, cron, retain, concurrency, labels):
         self.longhorn_client.create_recurring_job(
@@ -45,139 +47,135 @@ def add_to_volume(self, job_name, volume_name):
 
     def _wait_for_volume_recurringjob_update(self, job_name, volume_name):
         updated = False
-        for _ in range(RETRY_COUNTS):
+        for _ in range(self.retry_count):
             jobs, _ = self.get_volume_recurringjobs_and_groups(volume_name)
             if job_name in jobs:
                 updated = True
                 break
-            time.sleep(RETRY_INTERVAL)
+            time.sleep(self.retry_interval)
         assert updated
 
     def _wait_for_volume_recurringjob_delete(self, job_name, volume_name):
         deleted = False
-        for _ in range(RETRY_COUNTS):
+        for _ in range(self.retry_count):
             jobs, _ = self.get_volume_recurringjobs_and_groups(volume_name)
             if job_name not in jobs:
                 deleted = True
                 break
-            time.sleep(RETRY_INTERVAL)
+            time.sleep(self.retry_interval)
         assert deleted
 
     def get_volume_recurringjobs_and_groups(self, volume_name):
-        volume = self.longhorn_client.by_id_volume(volume_name)
-        list = volume.recurringJobList()
-        jobs = []
-        groups = []
-        for item in list:
-            if item['isGroup']:
-                groups.append(item['name'])
-            else:
-                jobs.append(item['name'])
-        return jobs, groups
+        for _ in range(self.retry_count):
+            volume = None
+            try:
+                volume = self.longhorn_client.by_id_volume(volume_name)
+                list = volume.recurringJobList()
+                jobs = []
+                groups = []
+                for item in list:
+                    if item['isGroup']:
+                        groups.append(item['name'])
+                    else:
+                        jobs.append(item['name'])
+                return jobs, groups
+            except Exception as e:
+                logging(f"Getting volume {volume} recurringjob list error: {e}")
+                time.sleep(self.retry_interval)
 
     def _wait_for_cron_job_create(self, job_name):
         created = False
-        for _ in range(RETRY_COUNTS):
+        for _ in range(self.retry_count):
             job = self.batch_v1_api.list_namespaced_cron_job(
                 'longhorn-system',
                 label_selector=f"recurring-job.longhorn.io={job_name}")
             if len(job.items) != 0:
                 created = True
                 break
-            time.sleep(RETRY_INTERVAL)
+            time.sleep(self.retry_interval)
         assert created
 
     def _wait_for_cron_job_delete(self, job_name):
         deleted = False
-        for _ in range(RETRY_COUNTS):
+        for _ in range(self.retry_count):
             job = self.batch_v1_api.list_namespaced_cron_job(
                 'longhorn-system',
                 label_selector=f"recurring-job.longhorn.io={job_name}")
             if len(job.items) == 0:
                 deleted = True
                 break
-            time.sleep(RETRY_INTERVAL)
+            time.sleep(self.retry_interval)
         assert deleted
 
     def check_jobs_work(self, volume_name):
-        # check if snapshot/backup is really created by the
-        # recurringjob following the cron schedule
-        # currently only support checking normal snapshot/backup
-        # every 1 min recurringjob
         jobs, _ = self.get_volume_recurringjobs_and_groups(volume_name)
         for job_name in jobs:
             job = self.get(job_name)
             logging(f"Checking recurringjob {job}")
-            if job['task'] == 'snapshot' and job['cron'] == '* * * * *':
-                period_in_sec = 60
-                self._check_snapshot_created_in_time(volume_name, job_name, period_in_sec)
-            elif job['task'] == 'backup' and job['cron'] == '* * * * *':
-                period_in_sec = 60
-                self._check_backup_created_in_time(volume_name, period_in_sec)
-
-    def _check_snapshot_created_in_time(self, volume_name, job_name, period_in_sec):
+            if job['task'] == 'snapshot':
+                self._check_snapshot_created(volume_name, job_name)
+            elif job['task'] == 'backup':
+                self._check_backup_created(volume_name)
+
+    def _check_snapshot_created(self, volume_name, job_name):
         # check snapshot can be created by the recurringjob
         current_time = datetime.utcnow()
         current_timestamp = current_time.timestamp()
-
         label_selector=f"longhornvolume={volume_name}"
-
-        max_iterations = period_in_sec * 10
-        for _ in range(max_iterations):
-            time.sleep(1)
-
+        snapshot_timestamp = 0
+        for i in range(self.retry_count):
+            logging(f"Waiting for {volume_name} new snapshot created ({i}) ...")
             snapshot_list = filter_cr("longhorn.io", "v1beta2", "longhorn-system", "snapshots", label_selector=label_selector)
-
-            if snapshot_list['items'] is None:
-                continue
-
-            for item in snapshot_list['items']:
-                # this snapshot can be created by snapshot or backup recurringjob
-                # but job_name is in spec.labels.RecurringJob
-                # and crd doesn't support field selector
-                # so need to filter by ourselves
-                if item['spec']['labels'] is None:
-                    continue
-
-                try:
-                    assert item['spec']['labels']['RecurringJob'] == job_name
-                except AssertionError:
-                    continue
-
-                snapshot_timestamp = datetime.strptime(snapshot_list['items'][0]['metadata']['creationTimestamp'], '%Y-%m-%dT%H:%M:%SZ').timestamp()
-
-                if snapshot_timestamp > current_timestamp:
-                    return
-
-                logging(f"Snapshot {item['metadata']['name']} timestamp = {snapshot_timestamp} is not greater than {current_timestamp}")
-
-        assert False, f"No new snapshot created by recurringjob {job_name} for {volume_name} since {current_time}"
-
-    def _check_backup_created_in_time(self, volume_name, period_in_sec):
+            try:
+                if len(snapshot_list['items']) > 0:
+                    for item in snapshot_list['items']:
+                        # this snapshot can be created by snapshot or backup recurringjob
+                        # but job_name is in spec.labels.RecurringJob
+                        # and crd doesn't support field selector
+                        # so need to filter by ourselves
+                        if 'RecurringJob' in item['status']['labels'] and \
+                            item['status']['labels']['RecurringJob'] == job_name and \
+                            item['status']['readyToUse'] == True:
+                            snapshot_time = item['metadata']['creationTimestamp']
+                            snapshot_time = datetime.strptime(snapshot_time, '%Y-%m-%dT%H:%M:%SZ')
+                            snapshot_timestamp = snapshot_time.timestamp()
+                        if snapshot_timestamp > current_timestamp:
+                            logging(f"Got snapshot {item}, create time = {snapshot_time}, timestamp = {snapshot_timestamp}")
+                            return
+            except Exception as e:
+                logging(f"Iterating snapshot list error: {e}")
+            time.sleep(self.retry_interval)
+        assert False, f"since {current_time},\
+                        there's no new snapshot created by recurringjob \
+                        {snapshot_list}"
+
+    def _check_backup_created(self, volume_name):
         # check backup can be created by the recurringjob
         current_time = datetime.utcnow()
         current_timestamp = current_time.timestamp()
-
         label_selector=f"backup-volume={volume_name}"
-
-        max_iterations = period_in_sec * 10
-        for _ in range(max_iterations):
-            time.sleep(1)
-
+        backup_timestamp = 0
+        for i in range(self.retry_count):
+            logging(f"Waiting for {volume_name} new backup created ({i}) ...")
             backup_list = filter_cr("longhorn.io", "v1beta2", "longhorn-system", "backups", label_selector=label_selector)
-
-            if backup_list['items'] is None:
-                continue
-
-            for item in backup_list['items']:
-                backup_timestamp = datetime.strptime(item['metadata']['creationTimestamp'], '%Y-%m-%dT%H:%M:%SZ').timestamp()
-
-                if backup_timestamp > current_timestamp:
-                    return
-
-                logging(f"Backup {item['metadata']['name']} timestamp = {backup_timestamp} is not greater than {current_timestamp}")
-
-        assert False, f"No new backup created by recurringjob for {volume_name} since {current_time}"
+            try:
+                if len(backup_list['items']) > 0:
+                    for item in backup_list['items']:
+                        state = item['status']['state']
+                        if state != "Completed":
+                            continue
+                        backup_time = item['metadata']['creationTimestamp']
+                        backup_time = datetime.strptime(backup_time, '%Y-%m-%dT%H:%M:%SZ')
+                        backup_timestamp = backup_time.timestamp()
+                        if backup_timestamp > current_timestamp:
+                            logging(f"Got backup {item}, create time = {backup_time}, timestamp = {backup_timestamp}")
+                            return
+            except Exception as e:
+                logging(f"Iterating backup list error: {e}")
+            time.sleep(self.retry_interval)
+        assert False, f"since {current_time},\
+                        there's no new backup created by recurringjob \
+                        {backup_list}"
 
     def cleanup(self, volume_names):
         for volume_name in volume_names:
diff --git a/e2e/libs/setting/__init__.py b/e2e/libs/setting/__init__.py
new file mode 100644
index 0000000000..abd0fa7375
--- /dev/null
+++ b/e2e/libs/setting/__init__.py
@@ -0,0 +1 @@
+from setting.setting import Setting
diff --git a/e2e/libs/setting/constant.py b/e2e/libs/setting/constant.py
new file mode 100644
index 0000000000..5f4ca0d73c
--- /dev/null
+++ b/e2e/libs/setting/constant.py
@@ -0,0 +1,3 @@
+SETTING_BACKUP_TARGET = "backup-target"
+SETTING_BACKUP_TARGET_CREDENTIAL_SECRET = "backup-target-credential-secret"
+SETTING_BACKUPSTORE_POLL_INTERVAL = "backupstore-poll-interval"
diff --git a/e2e/libs/setting/setting.py b/e2e/libs/setting/setting.py
new file mode 100644
index 0000000000..191f360ae0
--- /dev/null
+++ b/e2e/libs/setting/setting.py
@@ -0,0 +1,73 @@
+import os
+from utility.utility import get_longhorn_client
+from setting.constant import SETTING_BACKUP_TARGET
+from setting.constant import SETTING_BACKUP_TARGET_CREDENTIAL_SECRET
+from setting.constant import SETTING_BACKUPSTORE_POLL_INTERVAL
+
+class Setting:
+
+    def __init__(self):
+        self.longhorn_client = get_longhorn_client()
+
+    def get_backupstore_url(self):
+        backupstore = os.environ['LONGHORN_BACKUPSTORES']
+        backupstore = backupstore.replace(" ", "")
+        backupstores = backupstore.split(",")
+        assert len(backupstores) != 0
+        return backupstores
+
+    def get_backupstore_poll_interval(self):
+        poll_interval = os.environ['LONGHORN_BACKUPSTORE_POLL_INTERVAL']
+        assert len(poll_interval) != 0
+        return poll_interval
+
+    def set_backupstore(self):
+        backupstores = self.get_backupstore_url()
+        poll_interval = self.get_backupstore_poll_interval()
+        for backupstore in backupstores:
+            backupsettings = backupstore.split("$")
+            self.set_backupstore_url(backupsettings[0])
+            self.set_backupstore_credential_secret(backupsettings[1])
+            self.set_backupstore_poll_interval(poll_interval)
+            break
+
+    def reset_backupstore_setting(self):
+        backup_target_setting = self.longhorn_client.by_id_setting(SETTING_BACKUP_TARGET)
+        self.longhorn_client.update(backup_target_setting, value="")
+
+        backup_target_credential_setting = self.longhorn_client.by_id_setting(
+            SETTING_BACKUP_TARGET_CREDENTIAL_SECRET)
+        self.longhorn_client.update(backup_target_credential_setting, value="")
+
+        backup_store_poll_interval = self.longhorn_client.by_id_setting(
+            SETTING_BACKUPSTORE_POLL_INTERVAL)
+        self.longhorn_client.update(backup_store_poll_interval, value="300")
+
+    def set_backupstore_url(self, url):
+        backup_target_setting = self.longhorn_client.by_id_setting(SETTING_BACKUP_TARGET)
+        backup_target_setting = self.longhorn_client.update(backup_target_setting,
+                                                            value=url)
+        assert backup_target_setting.value == url
+
+    def set_backupstore_credential_secret(self, credential_secret):
+        backup_target_credential_setting = self.longhorn_client.by_id_setting(
+            SETTING_BACKUP_TARGET_CREDENTIAL_SECRET)
+        backup_target_credential_setting = self.longhorn_client.update(
+            backup_target_credential_setting, value=credential_secret)
+        assert backup_target_credential_setting.value == credential_secret
+
+    def set_backupstore_poll_interval(self, poll_interval):
+        backup_store_poll_interval_setting = self.longhorn_client.by_id_setting(
+            SETTING_BACKUPSTORE_POLL_INTERVAL)
+        backup_target_poll_interal_setting = self.longhorn_client.update(
+            backup_store_poll_interval_setting, value=poll_interval)
+        assert backup_target_poll_interal_setting.value == poll_interval
+
+    def get_backup_target(self):
+        backup_target_setting = self.longhorn_client.by_id_setting(SETTING_BACKUP_TARGET)
+        return backup_target_setting.value
+
+    def get_secret(self):
+        backup_target_credential_setting = self.longhorn_client.by_id_setting(
+            SETTING_BACKUP_TARGET_CREDENTIAL_SECRET)
+        return backup_target_credential_setting.value
diff --git a/e2e/requirements.txt b/e2e/requirements.txt
index 13311fde65..74b4b9fe6b 100644
--- a/e2e/requirements.txt
+++ b/e2e/requirements.txt
@@ -6,3 +6,4 @@ kubernetes==27.2.0
 requests==2.31.0
 boto3==1.34.79
 pyyaml==6.0.1
+minio==5.0.10
diff --git a/pipelines/utilities/run_longhorn_e2e_test.sh b/pipelines/utilities/run_longhorn_e2e_test.sh
index e79f5012c9..9c0da7a84f 100755
--- a/pipelines/utilities/run_longhorn_e2e_test.sh
+++ b/pipelines/utilities/run_longhorn_e2e_test.sh
@@ -66,6 +66,13 @@ run_longhorn_e2e_test(){
 
 run_longhorn_e2e_test_out_of_cluster(){
 
+  if [[ ${BACKUP_STORE_TYPE} == "s3" ]]; then
+    LONGHORN_BACKUPSTORES='s3://backupbucket@us-east-1/backupstore$minio-secret'
+  elif [[ $BACKUP_STORE_TYPE = "nfs" ]]; then
+    LONGHORN_BACKUPSTORES='nfs://longhorn-test-nfs-svc.default:/opt/backupstore'
+  fi
+  LONGHORN_BACKUPSTORE_POLL_INTERVAL="30"
+
   eval "ROBOT_COMMAND_ARGS=($PYTEST_CUSTOM_OPTIONS)"
 
   if [[ -n ${LONGHORN_TESTS_CUSTOM_IMAGE} ]]; then
@@ -74,6 +81,8 @@ run_longhorn_e2e_test_out_of_cluster(){
     CONTAINER_NAME="e2e-container-${IMAGE_NAME}"
     docker run --pull=always \
                --name "${CONTAINER_NAME}" \
+               -e LONGHORN_BACKUPSTORES="${LONGHORN_BACKUPSTORES}" \
+               -e LONGHORN_BACKUPSTORE_POLL_INTERVAL="${LONGHORN_BACKUPSTORE_POLL_INTERVAL}" \
                -e AWS_ACCESS_KEY_ID="${TF_VAR_lh_aws_access_key}" \
                -e AWS_SECRET_ACCESS_KEY="${TF_VAR_lh_aws_secret_key}" \
                -e AWS_DEFAULT_REGION="${TF_VAR_aws_region}" \
@@ -84,6 +93,8 @@ run_longhorn_e2e_test_out_of_cluster(){
     docker stop "${CONTAINER_NAME}"
     docker rm "${CONTAINER_NAME}"
   else
+    export LONGHORN_BACKUPSTORES=${LONGHORN_BACKUPSTORES}
+    export LONGHORN_BACKUPSTORE_POLL_INTERVAL=${LONGHORN_BACKUPSTORE_POLL_INTERVAL}
     cd e2e
     python3 -m venv .
     source bin/activate