From 520372f3f1478e36c19e514a3104b73c0e4e6e9c Mon Sep 17 00:00:00 2001 From: shen yushi Date: Thu, 31 Oct 2024 13:03:40 +0800 Subject: [PATCH] Docker cluster (#2118) ### What problem does this PR solve? Add docker cluster test. ### Type of change - [x] Test cases --- .github/workflows/slow_test.yml | 37 ++- .github/workflows/tests.yml | 72 ++-- .gitignore | 1 + python/infinity_http.py | 275 ++++++++-------- python/test_cluster/clear_docker.py | 37 +++ python/test_cluster/conftest.py | 57 +++- .../test_cluster/docker_infinity_cluster.py | 311 ++++++++++++++++++ python/test_cluster/infinity_cluster.py | 148 +++++++-- .../test_cluster/mocked_infinity_cluster.py | 32 +- python/test_cluster/requirements.txt | 4 +- python/test_cluster/test_basic.py | 53 ++- python/test_cluster/test_insert.py | 68 ++++ scripts/Dockerfile_infinity_builder_centos7 | 7 +- scripts/timeout_kill.sh | 6 +- src/storage/storage.cpp | 1 + tools/run_cluster_test.py | 41 ++- 16 files changed, 881 insertions(+), 269 deletions(-) create mode 100644 python/test_cluster/clear_docker.py create mode 100644 python/test_cluster/docker_infinity_cluster.py create mode 100644 python/test_cluster/test_insert.py diff --git a/.github/workflows/slow_test.yml b/.github/workflows/slow_test.yml index 4efe0fe04f..d98f5da5fb 100644 --- a/.github/workflows/slow_test.yml +++ b/.github/workflows/slow_test.yml @@ -65,22 +65,27 @@ jobs: sudo docker exec ${BUILDER_CONTAINER} bash -c "rm -rf /root/.config/pip/pip.conf && cd /infinity/ && pip3 uninstall -y infinity-sdk && cd python/infinity_sdk/ && pip3 install . -v --config-settings=cmake.build-type='RelWithDebInfo' --config-settings=build-dir='cmake-build-release' -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn && cd ../.." sudo docker exec ${BUILDER_CONTAINER} bash -c "rm -rf /root/.config/pip/pip.conf && cd /infinity/ && pip3 uninstall -y infinity-embedded-sdk && pip3 install . -v --config-settings=cmake.build-type='RelWithDebInfo' --config-settings=build-dir='cmake-build-release' -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn" - - name: Prepare cluster test - if: ${{ !cancelled() && !failure() }} - run : | - sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && pip3 install -r python/test_cluster/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn" - sudo docker exec ${BUILDER_CONTAINER} bash -c "yum install sudo iproute2 bridge-utils -y" - - - name: Run cluster test - if: ${{ !cancelled() && !failure() }} - id: run_cluster_test - run: sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && rm -fr /var/infinity && LD_PRELOAD=/usr/local/lib/clang/18/lib/x86_64-unknown-linux-gnu/libclang_rt.asan.so ASAN_OPTIONS=detect_leaks=0 python3 tools/run_cluster_test.py --infinity_path=cmake-build-release/src/infinity" - - - name: Collect cluster test output - if: ${{ !cancelled() }} - run: | - failure="${{ steps.run_cluster_test.outcome == 'failure'}}" - sudo python3 scripts/collect_cluster_log.py --executable_path=cmake-build-release/src/infinity --log_dir=/var/infinity/ --output_dir=${RUNNER_WORKSPACE_PREFIX}/log --failure=${failure} + # - name: Prepare cluster test + # if: ${{ !cancelled() && !failure() }} + # run : | + # sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && pip3 install -r python/test_cluster/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn" + # # sudo docker exec ${BUILDER_CONTAINER} bash -c "yum install sudo iproute2 bridge-utils -y" + # # sudo apt install virtualenv -y + # # virtualenv -p python3 .venv + # # source .venv/bin/activate && pip3 install pytest && pip3 install -r python/test_cluster/requirements.txt + + # - name: Run cluster test + # if: ${{ !cancelled() && !failure() }} + # id: run_cluster_test + # run: | + # sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && rm -fr /var/infinity && LD_PRELOAD=/usr/local/lib/clang/18/lib/x86_64-unknown-linux-gnu/libclang_rt.asan.so ASAN_OPTIONS=detect_leaks=0 python3 tools/run_cluster_test.py --infinity_path=cmake-build-release/src/infinity" + # # source .venv/bin/activate && sudo python3 tools/run_cluster_test.py --infinity_path=cmake-build-release/src/infinity --docker + + # - name: Collect cluster test output + # if: ${{ !cancelled() }} + # run: | + # failure="${{ steps.run_cluster_test.outcome == 'failure'}}" + # sudo python3 scripts/collect_cluster_log.py --executable_path=cmake-build-release/src/infinity --log_dir=/var/infinity/ --output_dir=${RUNNER_WORKSPACE_PREFIX}/log --failure=${failure} - name: Prepare restart test data if: ${{ !cancelled() && !failure() }} diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index bad5e1f8e4..01f10b5e61 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -64,22 +64,27 @@ jobs: if: ${{ !cancelled() && !failure() }} run: sudo docker exec ${BUILDER_CONTAINER} bash -c "rm -rf /root/.config/pip/pip.conf && cd /infinity/ && pip3 uninstall -y infinity-sdk infinity-embedded-sdk && pip3 install . -v --config-settings=cmake.build-type='Debug' --config-settings=build-dir='cmake-build-debug' -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn && cd python/infinity_sdk/ && pip3 install . -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn && cd ../.." - - name: Prepare cluster test - if: ${{ !cancelled() && !failure() }} - run : | - sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && pip3 install -r python/test_cluster/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn" - sudo docker exec ${BUILDER_CONTAINER} bash -c "yum install sudo iproute2 bridge-utils -y" + # - name: Prepare cluster test + # if: ${{ !cancelled() && !failure() }} + # run : | + # sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && pip3 install -r python/test_cluster/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn" + # # sudo docker exec ${BUILDER_CONTAINER} bash -c "yum install sudo iproute2 bridge-utils -y" + # # sudo apt install virtualenv -y + # # virtualenv -p python3 .venv + # # source .venv/bin/activate && pip3 install pytest && pip3 install -r python/test_cluster/requirements.txt - - name: Run cluster test - if: ${{ !cancelled() && !failure() }} - id: run_cluster_test - run: sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && rm -fr /var/infinity && LD_PRELOAD=/usr/local/lib/clang/18/lib/x86_64-unknown-linux-gnu/libclang_rt.asan.so ASAN_OPTIONS=detect_leaks=0 python3 tools/run_cluster_test.py --infinity_path=cmake-build-debug/src/infinity" - - - name: Collect cluster test output - if: ${{ !cancelled() }} - run: | - failure="${{ steps.run_cluster_test.outcome == 'failure'}}" - sudo python3 scripts/collect_cluster_log.py --executable_path=cmake-build-debug/src/infinity --log_dir=/var/infinity/ --output_dir=${RUNNER_WORKSPACE_PREFIX}/log --failure=${failure} + # - name: Run cluster test + # if: ${{ !cancelled() && !failure() }} + # id: run_cluster_test + # run: | + # sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && rm -fr /var/infinity && LD_PRELOAD=/usr/local/lib/clang/18/lib/x86_64-unknown-linux-gnu/libclang_rt.asan.so ASAN_OPTIONS=detect_leaks=0 python3 tools/run_cluster_test.py --infinity_path=cmake-build-debug/src/infinity" + # # source .venv/bin/activate && python3 tools/run_cluster_test.py --infinity_path=cmake-build-debug/src/infinity --docker + + # - name: Collect cluster test output + # if: ${{ !cancelled() }} + # run: | + # failure="${{ steps.run_cluster_test.outcome == 'failure'}}" + # sudo python3 scripts/collect_cluster_log.py --executable_path=cmake-build-debug/src/infinity --log_dir=/var/infinity/ --output_dir=${RUNNER_WORKSPACE_PREFIX}/log --failure=${failure} - name: Prepare restart test data if: ${{ !cancelled() && !failure() }} @@ -237,22 +242,27 @@ jobs: if: ${{ !cancelled() && !failure() }} run: sudo docker exec ${BUILDER_CONTAINER} bash -c "rm -rf /root/.config/pip/pip.conf && cd /infinity/ && pip3 uninstall -y infinity-sdk infinity-embedded-sdk && pip3 install . -v --config-settings=cmake.build-type='RelWithDebInfo' --config-settings=build-dir='cmake-build-release' -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn && cd python/infinity_sdk/ && pip3 install . -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn && cd ../.." - - name: Prepare cluster test - if: ${{ !cancelled() && !failure() }} - run : | - sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && pip3 install -r python/test_cluster/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn" - sudo docker exec ${BUILDER_CONTAINER} bash -c "yum install sudo iproute2 bridge-utils -y" - - - name: Run cluster test - if: ${{ !cancelled() && !failure() }} - id: run_cluster_test - run: sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && rm -fr /var/infinity && LD_PRELOAD=/usr/local/lib/clang/18/lib/x86_64-unknown-linux-gnu/libclang_rt.asan.so ASAN_OPTIONS=detect_leaks=0 python3 tools/run_cluster_test.py --infinity_path=cmake-build-release/src/infinity" - - - name: Collect cluster test output - if: ${{ !cancelled() }} - run: | - failure="${{ steps.run_cluster_test.outcome == 'failure'}}" - sudo python3 scripts/collect_cluster_log.py --executable_path=cmake-build-release/src/infinity --log_dir=/var/infinity/ --output_dir=${RUNNER_WORKSPACE_PREFIX}/log --failure=${failure} + # - name: Prepare cluster test + # if: ${{ !cancelled() && !failure() }} + # run : | + # sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && pip3 install -r python/test_cluster/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn" + # # sudo docker exec ${BUILDER_CONTAINER} bash -c "yum install sudo iproute2 bridge-utils -y" + # # sudo apt install virtualenv -y + # # virtualenv -p python3 .venv + # # source .venv/bin/activate && pip3 install pytest && pip3 install -r python/test_cluster/requirements.txt + + # - name: Run cluster test + # if: ${{ !cancelled() && !failure() }} + # id: run_cluster_test + # run: + # sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && rm -fr /var/infinity && LD_PRELOAD=/usr/local/lib/clang/18/lib/x86_64-unknown-linux-gnu/libclang_rt.asan.so ASAN_OPTIONS=detect_leaks=0 python3 tools/run_cluster_test.py --infinity_path=cmake-build-release/src/infinity" + # # source .venv/bin/activate && python3 tools/run_cluster_test.py --infinity_path=cmake-build-release/src/infinity --docker + + # - name: Collect cluster test output + # if: ${{ !cancelled() }} + # run: | + # failure="${{ steps.run_cluster_test.outcome == 'failure'}}" + # sudo python3 scripts/collect_cluster_log.py --executable_path=cmake-build-release/src/infinity --log_dir=/var/infinity/ --output_dir=${RUNNER_WORKSPACE_PREFIX}/log --failure=${failure} - name: Prepare restart test data if: ${{ !cancelled() && !failure() }} diff --git a/.gitignore b/.gitignore index 07d455ab88..89a6a777fa 100644 --- a/.gitignore +++ b/.gitignore @@ -138,3 +138,4 @@ tmp*/ dist/ python/infinity_sdk/infinity_sdk.egg-info/ +minio/ diff --git a/python/infinity_http.py b/python/infinity_http.py index f5ac6ccf7f..2c9ede44ba 100644 --- a/python/infinity_http.py +++ b/python/infinity_http.py @@ -12,10 +12,9 @@ import polars as pl import pyarrow as pa from infinity.table import ExplainType -from datetime import date, time, datetime from typing import List -class infinity_http_network: +class http_network_util: header_dict = baseHeader response_dict = baseResponse data_dict = baseData @@ -106,9 +105,12 @@ def get_database_result(self, resp, expect={}): print(e) return database_result(error_code=e.error_code) -class infinity_http(infinity_http_network): - def __init__(self, url: str = default_url): - super().__init__(url) +class infinity_http: + def __init__(self, *, net: http_network_util = None): + if net is not None: + self.net = net + else: + self.net = http_network_util(default_url) def disconnect(self): print("disconnect") @@ -116,49 +118,47 @@ def disconnect(self): def set_role_standalone(self, node_name): url = f"admin/node/current" - h = self.set_up_header(["accept", "content-type"]) - d = self.set_up_data([], {"role": "standalone", "name": node_name}) - r = self.request(url, "post", h, d) - self.raise_exception(r) + h = self.net.set_up_header(["accept", "content-type"]) + d = self.net.set_up_data([], {"role": "standalone", "name": node_name}) + r = self.net.request(url, "post", h, d) + self.net.raise_exception(r) def set_role_leader(self, node_name): url = f"admin/node/current" - h = self.set_up_header(["accept", "content-type"]) - d = self.set_up_data([], {"role": "leader", "name": node_name}) - r = self.request(url, "post", h, d) - self.raise_exception(r) + h = self.net.set_up_header(["accept", "content-type"]) + d = self.net.set_up_data([], {"role": "leader", "name": node_name}) + r = self.net.request(url, "post", h, d) + self.net.raise_exception(r) return database_result() def set_role_follower(self, node_name, leader_addr): url = f"admin/node/current" - h = self.set_up_header(["accept", "content-type"]) - d = self.set_up_data([], {"role": "follower", "name": node_name, "address": leader_addr}) - r = self.request(url, "post", h, d) - self.raise_exception(r) + h = self.net.set_up_header(["accept", "content-type"]) + d = self.net.set_up_data([], {"role": "follower", "name": node_name, "address": leader_addr}) + r = self.net.request(url, "post", h, d) + self.net.raise_exception(r) return database_result() # database def create_database(self, db_name, opt=ConflictType.Error): url = f"databases/{db_name}" - h = self.set_up_header(["accept", "content-type"]) + h = self.net.set_up_header(["accept", "content-type"]) if opt in [ConflictType.Error, ConflictType.Ignore, ConflictType.Replace]: - d = self.set_up_data( + d = self.net.set_up_data( ["create_option"], {"create_option": baseCreateOptions[opt]} ) - r = self.request(url, "post", h, d) - self.raise_exception(r) + r = self.net.request(url, "post", h, d) + self.net.raise_exception(r) return True else: try: - d = self.set_up_data( - ["create_option"], {"create_option": opt} - ) - r = self.request(url, "post", h, d) - self.raise_exception(r) + d = self.net.set_up_data(["create_option"], {"create_option": opt}) + r = self.net.request(url, "post", h, d) + self.net.raise_exception(r) except: raise InfinityException(ErrorCode.INVALID_CONFLICT_TYPE) - # d = self.set_up_data( + # d = self.net.set_up_data( # ["create_option"], {"create_option": str(opt)} # ) # different exception type @@ -166,50 +166,52 @@ def create_database(self, db_name, opt=ConflictType.Error): def drop_database(self, db_name, opt=ConflictType.Error): url = f"databases/{db_name}" - h = self.set_up_header(["accept", "content-type"]) + h = self.net.set_up_header(["accept", "content-type"]) if opt in [ConflictType.Error, ConflictType.Ignore]: - d = self.set_up_data(["drop_option"], {"drop_option": baseDropOptions[opt]}) - r = self.request(url, "delete", h, d) - self.raise_exception(r) + d = self.net.set_up_data( + ["drop_option"], {"drop_option": baseDropOptions[opt]} + ) + r = self.net.request(url, "delete", h, d) + self.net.raise_exception(r) return database_result() else: try: - d = self.set_up_data(["drop_option"], {"drop_option": opt}) - r = self.request(url, "delete", h, d) - self.raise_exception(r) + d = self.net.set_up_data(["drop_option"], {"drop_option": opt}) + r = self.net.request(url, "delete", h, d) + self.net.raise_exception(r) except: raise InfinityException(ErrorCode.INVALID_CONFLICT_TYPE) def get_database(self, db_name, opt=ConflictType.Error): url = f"databases/{db_name}" - h = self.set_up_header(["accept"]) - r = self.request(url, "get", h, {}) + h = self.net.set_up_header(["accept"]) + r = self.net.request(url, "get", h, {}) try: - self.raise_exception(r) - return database_http(self, database_name=r.json()["database_name"]) + self.net.raise_exception(r) + return database_http(self.net, database_name=r.json()["database_name"]) except: raise InfinityException(ErrorCode.DB_NOT_EXIST) def list_databases(self): url = "databases" - self.set_up_header(["accept"]) - r = self.request(url, "get") - self.raise_exception(r) + self.net.set_up_header(["accept"]) + r = self.net.request(url, "get") + self.net.raise_exception(r) return database_result(list=r.json()["databases"]) def show_database(self, db_name): url = f"databases/{db_name}" - h = self.set_up_header(["accept"]) - r = self.request(url, "get", h, {}) - self.raise_exception(r) + h = self.net.set_up_header(["accept"]) + r = self.net.request(url, "get", h, {}) + self.net.raise_exception(r) return database_result(database_name=r.json()["database_name"]) ####################3####################3####################3####################3####################3####################3####################3####################3 -class database_http(infinity_http_network): - def __init__(self, network_util: infinity_http_network, database_name: str): - super().__init__(network_util.base_url) +class database_http: + def __init__(self, net: http_network_util, database_name: str): + self.net = net self.database_name = database_name self._db_name = database_name @@ -239,18 +241,18 @@ def create_table( print(fields) url = f"databases/{self.database_name}/tables/{table_name}" - h = self.set_up_header(["accept", "content-type"]) - d = self.set_up_data( + h = self.net.set_up_header(["accept", "content-type"]) + d = self.net.set_up_data( ["create_option"], { "fields": fields, "create_option": copt, }, ) - r = self.request(url, "post", h, d) - self.raise_exception(r) + r = self.net.request(url, "post", h, d) + self.net.raise_exception(r) self.table_name = table_name - return table_http(self, self.database_name, table_name) + return table_http(self.net, self.database_name, table_name) def drop_table( self, @@ -264,31 +266,31 @@ def drop_table( copt = baseDropOptions[conflict_type] url = f"databases/{self.database_name}/tables/{table_name}" - h = self.set_up_header(["accept", "content-type"]) - d = self.set_up_data(["drop_option"], {"drop_option": copt}) - r = self.request(url, "delete", h, d) - self.raise_exception(r) + h = self.net.set_up_header(["accept", "content-type"]) + d = self.net.set_up_data(["drop_option"], {"drop_option": copt}) + r = self.net.request(url, "delete", h, d) + self.net.raise_exception(r) return database_result() def list_tables(self): url = f"databases/{self.database_name}/tables" - h = self.set_up_header(["accept"]) - r = self.request(url, "get", h) - self.raise_exception(r) + h = self.net.set_up_header(["accept"]) + r = self.net.request(url, "get", h) + self.net.raise_exception(r) return database_result() def show_table(self, table_name): check_valid_name(table_name) url = f"databases/{self.database_name}/tables/{table_name}" - h = self.set_up_header(["accept"]) - r = self.request(url, "get", h) - self.raise_exception(r) + h = self.net.set_up_header(["accept"]) + r = self.net.request(url, "get", h) + self.net.raise_exception(r) # self.table_name = table_name def get_all_tables(self): url = f"databases/{self.database_name}/tables" - h = self.set_up_header(["accept"]) - r = self.request(url, "get", h) + h = self.net.set_up_header(["accept"]) + r = self.net.request(url, "get", h) # return all db names ret = [] r_json = r.json() @@ -302,10 +304,10 @@ def get_all_tables(self): def get_table(self, table_name): check_valid_name(table_name) url = f"databases/{self.database_name}/tables/{table_name}" - h = self.set_up_header(["accept"]) - r = self.request(url, "get", h) - self.raise_exception(r) - return table_http(self, self.database_name, table_name) + h = self.net.set_up_header(["accept"]) + r = self.net.request(url, "get", h) + self.net.raise_exception(r) + return table_http(self.net, self.database_name, table_name) # not implemented, just to pass test def show_tables(self): @@ -313,17 +315,17 @@ def show_tables(self): return database_result(columns=["database", "table", "type", "column_count", "block_count", "block_capacity", "segment_count", "segment_capacity", "comment"]) -class table_http(infinity_http_network): - def __init__(self, network_util: infinity_http_network, database_name: str, table_name: str): - super().__init__(network_util.base_url) +class table_http: + def __init__(self, net: http_network_util, database_name: str, table_name: str): + self.net = net self.database_name = database_name self.table_name = table_name def show_columns(self): url = f"databases/{self.database_name}/tables/{self.table_name}/columns" - h = self.set_up_header(["accept"]) - r = self.request(url, "get", h) - self.raise_exception(r) + h = self.net.set_up_header(["accept"]) + r = self.net.request(url, "get", h) + self.net.raise_exception(r) res = {"name": [], "type": [], "default": [], "comment": []} print(r.json()) for col in r.json()["columns"]: @@ -336,9 +338,9 @@ def show_columns(self): def show_columns_type(self): url = f"databases/{self.database_name}/tables/{self.table_name}/columns" - h = self.set_up_header(["accept"]) - r = self.request(url, "get", h) - self.raise_exception(r) + h = self.net.set_up_header(["accept"]) + r = self.net.request(url, "get", h) + self.net.raise_exception(r) res = {} for col in r.json()["columns"]: res[col["name"]] = col["type"] @@ -370,14 +372,14 @@ def create_index( # print(create_index_info) url = f"databases/{self.database_name}/tables/{self.table_name}/indexes/{index_name}" - h = self.set_up_header( + h = self.net.set_up_header( ["accept", "content-type"], ) - d = self.set_up_data( + d = self.net.set_up_data( ["create_option"], {"fields": fields, "index": create_index_info, "create_option": copt} ) - r = self.request(url, "post", h, d) - self.raise_exception(r) + r = self.net.request(url, "post", h, d) + self.net.raise_exception(r) return database_result() def drop_index( @@ -393,24 +395,24 @@ def drop_index( url = f"databases/{self.database_name}/tables/{self.table_name}/indexes/{index_name}" - h = self.set_up_header(["accept"]) - d = self.set_up_data([], {"drop_option": copt}) - r = self.request(url, "delete", h, d) - self.raise_exception(r) + h = self.net.set_up_header(["accept"]) + d = self.net.set_up_data([], {"drop_option": copt}) + r = self.net.request(url, "delete", h, d) + self.net.raise_exception(r) return database_result() def show_index(self, index_name): url = f"databases/{self.database_name}/tables/{self.table_name}/indexes/{index_name}" - h = self.set_up_header(["accept"]) - r = self.request(url, "get", h) - self.raise_exception(r) + h = self.net.set_up_header(["accept"]) + r = self.net.request(url, "get", h) + self.net.raise_exception(r) return database_result() def list_indexes(self): url = f"databases/{self.database_name}/tables/{self.table_name}/indexes" - h = self.set_up_header(["accept"]) - r = self.request(url, "get", h) - self.raise_exception(r) + h = self.net.set_up_header(["accept"]) + r = self.net.request(url, "get", h) + self.net.raise_exception(r) r_json = r.json() index_list = [] exists = r_json.get("tables", None) @@ -421,12 +423,12 @@ def list_indexes(self): def optimize(self, index_name="", optimize_options={}): url = f"databases/{self.database_name}/tables/{self.table_name}/indexes/{index_name}" - h = self.set_up_header( + h = self.net.set_up_header( ["accept", "content-type"], ) opt_opt = {"optimize_options": optimize_options} - r = self.request(url, "put", h, opt_opt) - self.raise_exception(r) + r = self.net.request(url, "put", h, opt_opt) + self.net.raise_exception(r) return database_result() def insert(self, values=[]): @@ -449,9 +451,9 @@ def insert(self, values=[]): value[key] = value[key].to_dict() url = f"databases/{self.database_name}/tables/{self.table_name}/docs" - h = self.set_up_header(["accept", "content-type"]) - r = self.request(url, "post", h, values) - self.raise_exception(r) + h = self.net.set_up_header(["accept", "content-type"]) + r = self.net.request(url, "post", h, values) + self.net.raise_exception(r) return database_result() def import_data(self, data_path="/home/infiniflow/Documents/development/infinity/test/data/csv/pysdk_test.csv", @@ -470,10 +472,10 @@ def import_data(self, data_path="/home/infiniflow/Documents/development/infinity data["delimiter"] = import_options["delimiter"] url = f"databases/{self.database_name}/tables/{self.table_name}" - h = self.set_up_header(["accept", "content-type"]) - d = self.set_up_data([], data) - r = self.request(url, "put", h, d) - self.raise_exception(r) + h = self.net.set_up_header(["accept", "content-type"]) + d = self.net.set_up_data([], data) + r = self.net.request(url, "put", h, d) + self.net.raise_exception(r) return database_result() def export_data(self, data_path="", export_options={}, columns=[]): @@ -498,59 +500,60 @@ def export_data(self, data_path="", export_options={}, columns=[]): data["columns"] = columns url = f"databases/{self.database_name}/table/{self.table_name}" - h = self.set_up_header(["accept", "content-type"]) - d = self.set_up_data([], data) - r = self.request(url, "get", h, d) - self.raise_exception(r) + h = self.net.set_up_header(["accept", "content-type"]) + d = self.net.set_up_data([], data) + r = self.net.request(url, "get", h, d) + self.net.raise_exception(r) return database_result() def add_columns(self, columns_definition={}): url = f"databases/{self.database_name}/tables/{self.table_name}/columns" - h = self.set_up_header(["accept", "content-type"]) + h = self.net.set_up_header(["accept", "content-type"]) fields = [] for col in columns_definition: tmp = {"name": col} for param_name in columns_definition[col]: tmp[param_name.lower()] = columns_definition[col][param_name] fields.append(tmp) - d = self.set_up_data([], {"fields": fields}) - r = self.request(url, "post", h, d) - return self.get_database_result(r) + d = self.net.set_up_data([], {"fields": fields}) + r = self.net.request(url, "post", h, d) + return self.net.get_database_result(r) def drop_columns(self, column_name: list[str] | str): if isinstance(column_name, str): column_name = [column_name] url = f"databases/{self.database_name}/tables/{self.table_name}/columns" - h = self.set_up_header(["accept", "content-type"]) - d = self.set_up_data([], {"column_names": column_name}) - r = self.request(url, "delete", h, d) - return self.get_database_result(r) + h = self.net.set_up_header(["accept", "content-type"]) + d = self.net.set_up_data([], {"column_names": column_name}) + r = self.net.request(url, "delete", h, d) + return self.net.get_database_result(r) def output( - self, - output=[], + self, + output=[], ): return table_http_result(output, self) def delete(self, filter=""): url = f"databases/{self.database_name}/tables/{self.table_name}/docs" - h = self.set_up_header(["accept", "content-type"]) - d = self.set_up_data([], {"filter": filter}) - r = self.request(url, "delete", h, d) - self.raise_exception(r) + h = self.net.set_up_header(["accept", "content-type"]) + d = self.net.set_up_data([], {"filter": filter}) + r = self.net.request(url, "delete", h, d) + self.net.raise_exception(r) return database_result() def update(self, filter_str: str, update: dict[str, Any]): url = f"databases/{self.database_name}/tables/{self.table_name}/docs" - h = self.set_up_header(["accept", "content-type"]) - d = self.set_up_data([], {"update": update, "filter": filter_str}) - r = self.request(url, "put", h, d) - self.raise_exception(r) + h = self.net.set_up_header(["accept", "content-type"]) + d = self.net.set_up_data([], {"update": update, "filter": filter_str}) + r = self.net.request(url, "put", h, d) + self.net.raise_exception(r) return database_result() -class table_http_result(table_http): + +class table_http_result: def __init__(self, output: list, table_http: table_http): - super().__init__(table_http, table_http.database_name, table_http.table_name) + self.table_http = table_http self.output_res = [] self._output = output @@ -565,8 +568,8 @@ def __init__(self, output: list, table_http: table_http): self._sort = [] def select(self): - url = f"databases/{self.database_name}/tables/{self.table_name}/docs" - h = self.set_up_header(["accept", "content-type"]) + url = f"databases/{self.table_http.database_name}/tables/{self.table_http.table_name}/docs" + h = self.table_http.net.set_up_header(["accept", "content-type"]) tmp = {} if len(self._filter): tmp["filter"] = self._filter @@ -579,9 +582,9 @@ def select(self): if len(self._sort): tmp["sort"] = self._sort # print(tmp) - d = self.set_up_data([], tmp) - r = self.request(url, "get", h, d) - self.raise_exception(r) + d = self.table_http.net.set_up_data([], tmp) + r = self.table_http.net.request(url, "get", h, d) + self.table_http.net.raise_exception(r) # print(r.json()) if "output" in r.json(): self.output_res = r.json()["output"] @@ -590,8 +593,8 @@ def select(self): return self def explain(self, ExplainType=ExplainType.Physical): - url = f"databases/{self.database_name}/tables/{self.table_name}/meta" - h = self.set_up_header(["accept", "content-type"]) + url = f"databases/{self.table_http.database_name}/tables/{self.table_http.table_name}/meta" + h = self.table_http.net.set_up_header(["accept", "content-type"]) tmp = {} if len(self._filter): tmp["filter"] = self._filter @@ -611,9 +614,9 @@ def explain(self, ExplainType=ExplainType.Physical): tmp["highlight"] = self._highlight tmp["explain_type"] = ExplainType_transfrom(ExplainType) # print(tmp) - d = self.set_up_data([], tmp) - r = self.request(url, "get", h, d) - self.raise_exception(r) + d = self.table_http.net.set_up_data([], tmp) + r = self.table_http.net.request(url, "get", h, d) + self.table_http.net.raise_exception(r) message = "" sign = 0 for res in r.json()["output"]: @@ -722,7 +725,7 @@ def to_df(self): self.select() df_dict = {} - col_types = self.show_columns_type() + col_types = self.table_http.show_columns_type() for output_col in self._output: if output_col in col_types: df_dict[output_col] = () diff --git a/python/test_cluster/clear_docker.py b/python/test_cluster/clear_docker.py new file mode 100644 index 0000000000..84645e83c5 --- /dev/null +++ b/python/test_cluster/clear_docker.py @@ -0,0 +1,37 @@ +import docker +import shutil +import os +import argparse + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Clear docker containers") + parser.add_argument( + "--docker", + action="store_true", + default=False, + ) + + use_docker = parser.parse_args().docker + + # sudo docker rm -f $(sudo docker ps -a -q -f name=minio); + # sudo docker rm -f $(sudo docker ps -a -q -f name=infinity_build); + # sudo docker network rm $(sudo docker network ls -q -f name=infinity_network) + + client = docker.from_env() + + if use_docker: + for container in client.containers.list(all=True, filters={"name": "minio_docker"}): + container.remove(force=True) + for container in client.containers.list( + all=True, filters={"name": "infinity_build"} + ): + container.remove(force=True) + for network in client.networks.list(filters={"name": "infinity_network"}): + network.remove() + else: + for container in client.containers.list(all=True, filters={"name": "minio_host"}): + container.remove(force=True) + + dir_path = "./minio" + if os.path.exists(dir_path) and os.path.isdir(dir_path): + shutil.rmtree(dir_path) \ No newline at end of file diff --git a/python/test_cluster/conftest.py b/python/test_cluster/conftest.py index c41e704d0c..1ad6b720ac 100644 --- a/python/test_cluster/conftest.py +++ b/python/test_cluster/conftest.py @@ -1,12 +1,61 @@ +import pytest +from infinity_cluster import InfinityCluster, MinioParams +from docker_infinity_cluster import DockerInfinityCluster +from mocked_infinity_cluster import MockInfinityCluster + +@pytest.fixture(scope="function") +def skip_if_docker(request): + if request.config.getoption("--docker"): + pytest.skip("Skipping docker test") + + +@pytest.fixture(scope="function") +def skip_if_not_docker(request): + if not request.config.getoption("--docker"): + pytest.skip("Skipping not docker test") + + def pytest_addoption(parser): parser.addoption( "--infinity_path", action="store", - default="./build/Debug/src/infinity", + default="cmake-build-debug/src/infinity", + ) + parser.addoption( + "--docker", + action="store_true", + default=False, + help="Run cluster test on docker", + ) + parser.addoption( + "--minio_dir", + action="store", + default="minio", + ) + parser.addoption( + "--minio_port", + action="store", + default=9001, ) def pytest_generate_tests(metafunc): - if "infinity_path" in metafunc.fixturenames: - infinity_path = metafunc.config.getoption("infinity_path") - metafunc.parametrize("infinity_path", [infinity_path]) + infinity_path = metafunc.config.getoption("infinity_path") + minio_dir = metafunc.config.getoption("minio_dir") + minio_port = metafunc.config.getoption("minio_port") + minio_params = MinioParams(minio_dir, minio_port) + # print(metafunc.fixturenames) + + if metafunc.config.getoption("--docker"): + print("Docker argument is provided") + if "docker_cluster" in metafunc.fixturenames: + docker_infinity_cluster = DockerInfinityCluster(infinity_path, minio_params=minio_params) + metafunc.parametrize("docker_cluster", [docker_infinity_cluster]) + else: + print("Docker argument is not provided") + if "cluster" in metafunc.fixturenames: + infinity_cluster = InfinityCluster(infinity_path, minio_params=minio_params) + metafunc.parametrize("cluster", [infinity_cluster]) + elif "mock_cluster" in metafunc.fixturenames: + mock_infinity_cluster = MockInfinityCluster(infinity_path, minio_params=minio_params) + metafunc.parametrize("mock_cluster", [mock_infinity_cluster]) diff --git a/python/test_cluster/docker_infinity_cluster.py b/python/test_cluster/docker_infinity_cluster.py new file mode 100644 index 0000000000..54c9b39094 --- /dev/null +++ b/python/test_cluster/docker_infinity_cluster.py @@ -0,0 +1,311 @@ +import json +import logging +import os +import random +import string +import sys +import time +from numpy import dtype +import pandas as pd +import tomli +import tomli_w +import docker +from infinity_cluster import ( + BaseInfinityRunner, + InfinityCluster, + MinioParams, + convert_request_to_curl, +) +from mocked_infinity_cluster import convert_request_to_curl +import shutil + +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(current_dir) +if parent_dir not in sys.path: + sys.path.insert(0, parent_dir) +from infinity_http import http_network_util, infinity_http + + +class docker_http_response: + def __init__(self, json_output): + self.json_output = json_output + + def json(self): + return self.json_output + + +class docker_http_network(http_network_util): + def __init__(self, container, *args, **kwargs): + super().__init__(*args, **kwargs) + self.container = container + + def request(self, url, method, header={}, data={}): + if header is None: + header = {} + url = self.base_url + url + logging.debug("url: " + url) + + cmd = convert_request_to_curl(method, header, data, url) + print(cmd) + exit_code, output = self.container.exec_run(cmd) + print(output) + assert exit_code is None or exit_code == 0 + try: + return docker_http_response(json.loads(output)) + except json.JSONDecodeError as e: + logging.error(f"Failed to decode JSON response: {e}") + raise + + def raise_exception(self, resp, expect={}): + # todo: handle curl exception + pass + + +class DockerInfinityRunner(BaseInfinityRunner): + def __init__( + self, + container: docker.models.containers.Container, + mock_ip: str, + minio_ip: str | None, + *args, + **kwargs, + ): + self.minio_ip = minio_ip + self.container = container + self.mock_ip = mock_ip + super().__init__(*args, **kwargs) + + def init(self, config_path: str | None): + if config_path: + if self.config_path is not None and os.path.exists(self.config_path): + os.remove(self.config_path) + self.config_path = config_path + self.load_config() + run_cmds = " && ".join( + [ + "cd /infinity", + f"{self.executable_path} --config={self.config_path}", + ] + ) + print(run_cmds) + exit_code, output = self.container.exec_run( + f"bash -c '{run_cmds}'", detach=True + ) + print(f"init infinity: {output}") + assert exit_code is None or exit_code == 0 + + def uninit(self): + timeout = 60 + run_cmds = " && ".join( + [ + "cd /infinity", + f"pid=$(pgrep -f infinity || true)", + f"bash scripts/timeout_kill.sh {timeout} $pid", + ] + ) + print(run_cmds) + exit_code, output = self.container.exec_run(f"bash -c '{run_cmds}'") + print(f"uninit infinity: {output}") + # assert exit_code is None or exit_code == 0 + # self.container.stop() + # self.container.remove(force=True, v=True) + + if os.path.exists(self.config_path): + os.remove(self.config_path) + + def add_client(self, http_addr: str): + http_addr = http_addr.replace("http://", "") + http_ip, http_port = http_addr.split(":") + mock_addr = f"http://{self.mock_ip}:{http_port}" + print(f"add client: {mock_addr}") + self.client = infinity_http(net=docker_http_network(self.container, mock_addr)) + + def peer_uri(self): + peer_port = self.network_config["peer_port"] + return self.mock_ip, peer_port + + def load_config(self): + if not os.path.basename(self.config_path).startswith("mock_"): + config_dir, config_filename = os.path.split(self.config_path) + mock_config_path = os.path.join(config_dir, f"mock_{config_filename}") + shutil.copyfile(self.config_path, mock_config_path) + self.config_path = mock_config_path + + with open(self.config_path, "rb") as f: + config = tomli.load(f) + self.network_config = config["network"] + + if self.minio_ip is not None: + minio_url = config["storage"]["object_storage"]["url"] + minio_ip, minio_port = minio_url.split(":") + + new_minio_url = f"{self.minio_ip}:{minio_port}" + config["storage"]["object_storage"]["url"] = new_minio_url + + config["network"]["server_address"] = self.mock_ip + config["network"]["peer_ip"] = self.mock_ip + + with open(self.config_path, "wb") as f: + tomli_w.dump(config, f) + + +class DockerInfinityCluster(InfinityCluster): + def __init__( + self, + executable_path: str, + *, + minio_params: MinioParams = None, + ): + super().__init__(executable_path) + + image_name = "infiniflow/infinity_builder:centos7_clang18" + docker_client = docker.from_env() + self.image_name = image_name + + network_name = "infinity_network" + try: + self.network = docker_client.networks.get(network_name) + except docker.errors.NotFound: + self.network = docker_client.networks.create( + network_name, + driver="bridge", + ) + + if minio_params is not None: + add = self.add_minio(minio_params, False) + if add: + self.network.connect(self.minio_container) + info = docker_client.api.inspect_network(self.network.id) + minio_ip = info["Containers"][self.minio_container.id]["IPv4Address"] + minio_ip = minio_ip.split("/")[0] + self.minio_ip = minio_ip + self.minio_params = minio_params + + def clear(self): + super().clear() + for runner in self.runners.values(): + runner.uninit() + # self.network.remove() + + def add_node(self, node_name: str, config_path: str): + if node_name in self.runners: + raise ValueError(f"Node {node_name} already exists in the cluster.") + container_name, cpus, tz = self.__init_docker_params() + pwd = os.getcwd() + docker_client = docker.from_env() + + try: + container = docker_client.containers.get(container_name) + added = False + except docker.errors.NotFound: + container = docker_client.containers.run( + image=self.image_name, + name=container_name, + detach=True, + cpuset_cpus=f"0-{cpus - 1}", + volumes=[f"{pwd}:/infinity", "/boot:/boot"], + environment=[f"TZ={tz}"], + ) + added = True + + if added: + self.network.connect(container) + info = docker_client.api.inspect_network(self.network.id) + # print(info) + mock_ip = info["Containers"][container.id]["IPv4Address"] + mock_ip = mock_ip.split("/")[0] + + runner = DockerInfinityRunner( + container, + mock_ip, + self.minio_ip, + node_name, + self.executable_path, + config_path, + ) + self.runners[node_name] = runner + + def remove_node(self, node_name: str): + if node_name not in self.runners: + raise ValueError(f"Node {node_name} not found in the cluster.") + cur_runner: DockerInfinityRunner = self.runners[node_name] + self.network.disconnect(cur_runner.container) + cur_runner.uninit() + del self.runners[node_name] + + def disconnect(self, node_name: str): + if node_name not in self.runners: + raise ValueError(f"Node {node_name} not found in the cluster.") + cur_runner: DockerInfinityRunner = self.runners[node_name] + self.network.disconnect(cur_runner.container) + + def reconnect(self, node_name: str): + if node_name not in self.runners: + raise ValueError(f"Node {node_name} not found in the cluster.") + cur_runner: DockerInfinityRunner = self.runners[node_name] + self.network.connect(cur_runner.container) + docker_client = docker.from_env() + info = docker_client.api.inspect_network(self.network.id) + # print(info) + mock_ip = info["Containers"][cur_runner.container.id]["IPv4Address"] + cur_runner.mock_ip = mock_ip + + def __init_docker_params(self): + container_name = f"infinity_build_{len(self.runners)}" + cpus = os.cpu_count() + tz = os.readlink("/etc/localtime").split("/zoneinfo/")[1] + return container_name, cpus, tz + + +if __name__ == "__main__": + infinity_path = "cmake-build-debug/src/infinity" + minio_dir = "minio" + minio_port = 9001 + cluster = DockerInfinityCluster( + infinity_path, minio_params=MinioParams(minio_dir, minio_port) + ) + try: + cluster.add_node("node1", "conf/leader.toml") + cluster.add_node("node2", "conf/follower.toml") + + print("init nodes") + + cluster.init_leader("node1") + cluster.init_follower("node2") + + time.sleep(1) + print("insert in node1") + + infinity1 = cluster.client("node1") + r = infinity1.list_databases() + + db1 = infinity1.get_database("default_db") + table1 = db1.create_table( + "table1", {"c1": {"type": "int"}, "c2": {"type": "vector,4,float"}} + ) + table1.insert([{"c1": 1, "c2": [1.0, 2.0, 3.0, 4.0]}]) + + res_gt = pd.DataFrame( + { + "c1": (1), + "c2": ([[1.0, 2.0, 3.0, 4.0]]), + } + ).astype({"c1": dtype("int32"), "c2": dtype("object")}) + + res = table1.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, res_gt) + + time.sleep(1) + print("select in node2") + + infinity2 = cluster.client("node2") + db2 = infinity2.get_database("default_db") + table2 = db2.get_table("table1") + res = table2.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, res_gt) + except Exception as e: + print(e) + cluster.clear() + raise + else: + cluster.clear() diff --git a/python/test_cluster/infinity_cluster.py b/python/test_cluster/infinity_cluster.py index 963a39205d..fac777d2d2 100644 --- a/python/test_cluster/infinity_cluster.py +++ b/python/test_cluster/infinity_cluster.py @@ -1,5 +1,10 @@ +from abc import abstractmethod +import json +import random +import string import subprocess import time +import docker import tomli import sys import os @@ -11,46 +16,38 @@ parent_dir = os.path.dirname(current_dir) if parent_dir not in sys.path: sys.path.insert(0, parent_dir) -from infinity_http import infinity_http +from infinity_http import infinity_http, http_network_util -class InfinityRunner: +def convert_request_to_curl(method: str, header: dict, data: dict, url: str): + cmd = "curl -sS --request {method} --url {url} {headers} --data '{data}'" + method = method.upper() + headers = " ".join([f"--header '{key}:{value}'" for key, value in header.items()]) + data = json.dumps(data) + return cmd.format(method=method, headers=headers, data=data, url=url) + + +class MinioParams: + def __init__(self, minio_dir: str, minio_port: int): + self.minio_dir = minio_dir + self.minio_port = minio_port + + +class BaseInfinityRunner: def __init__(self, node_name: str, executable_path: str, config_path: str): self.node_name = node_name self.executable_path = executable_path self.config_path = config_path - self.__load_config() - self.process = None + self.load_config() self.client = None - def __del__(self): - self.uninit() - + @abstractmethod def init(self, config_path: str | None): - if self.process is not None: - raise ValueError("Process is already initialized.") - if config_path: - self.config_path = config_path - self.__load_config() - - cmd = [self.executable_path, f"--config={self.config_path}"] - my_env = os.environ.copy() - my_env["LD_PRELOAD"] = "" - my_env["ASAN_OPTIONS"] = "" - self.process = subprocess.Popen(cmd, shell=False, env=my_env) - time.sleep(1) # Give the process a moment to start - if self.process.poll() is not None: - raise RuntimeError( - f"Failed to start process for node {self.node_name}, return code: {self.process.returncode}" - ) - print(f"Launch {self.node_name} successfully. pid: {self.process.pid}") + pass + @abstractmethod def uninit(self): - print(f"Uniting node {self.node_name}") - if self.process is None: - return - timeout = 60 - timeout_kill.timeout_kill(timeout, self.process) + pass def init_as_standalone(self, config_path: str | None = None): self.init(config_path) @@ -73,8 +70,9 @@ def init_as_follower(self, leader_addr: str, config_path: str | None = None): lambda: self.client.set_role_follower(self.node_name, leader_addr) ) + @abstractmethod def add_client(self, http_addr: str): - self.client = infinity_http(http_addr) + pass def http_uri(self): http_ip = self.network_config["server_address"] @@ -86,12 +84,11 @@ def peer_uri(self): peer_port = self.network_config["peer_port"] return peer_ip, peer_port - def __load_config(self): - with open(self.config_path, "rb") as f: - config = tomli.load(f) - self.network_config = config["network"] + @abstractmethod + def load_config(self): + pass - def __init_cmd(self, send_f, timeout=30): + def __init_cmd(self, send_f, timeout=10): t1 = time.time() while True: try: @@ -105,18 +102,97 @@ def __init_cmd(self, send_f, timeout=30): break +class InfinityRunner(BaseInfinityRunner): + def __init__(self, node_name: str, executable_path: str, config_path: str): + super().__init__(node_name, executable_path, config_path) + self.process = None + + def init(self, config_path: str | None): + if self.process is not None: + raise ValueError("Process is already initialized.") + if config_path: + self.config_path = config_path + self.load_config() + + cmd = [self.executable_path, f"--config={self.config_path}"] + my_env = os.environ.copy() + my_env["LD_PRELOAD"] = "" + my_env["ASAN_OPTIONS"] = "" + self.process = subprocess.Popen(cmd, shell=False, env=my_env) + time.sleep(1) # Give the process a moment to start + if self.process.poll() is not None: + raise RuntimeError( + f"Failed to start process for node {self.node_name}, return code: {self.process.returncode}" + ) + print(f"Launch {self.node_name} successfully. pid: {self.process.pid}") + + def uninit(self): + print(f"Uniting node {self.node_name}") + if self.process is None: + return + timeout = 60 + timeout_kill.timeout_kill(timeout, self.process) + + def add_client(self, http_addr: str): + self.client = infinity_http(net=http_network_util(http_addr)) + + def load_config(self): + with open(self.config_path, "rb") as f: + config = tomli.load(f) + self.network_config = config["network"] + + class InfinityCluster: - def __init__(self, executable_path: str): + def __init__(self, executable_path: str, *, minio_params: MinioParams = None): self.executable_path = executable_path self.runners: dict[str, InfinityRunner] = {} self.leader_runner: InfinityRunner | None = None + if minio_params is not None: + self.add_minio(minio_params, True) + + def clear(self): + for runner in self.runners.values(): + runner.uninit() + # if self.minio_container is not None: + # self.minio_container.remove(force=True, v=True) + def add_node(self, node_name: str, config_path: str): runner = InfinityRunner(node_name, self.executable_path, config_path) if node_name in self.runners: raise ValueError(f"Node {node_name} already exists in the cluster.") self.runners[node_name] = runner + def add_minio(self, minio_params: MinioParams, host_net: bool) -> bool: + minio_image_name = "quay.io/minio/minio" + + minio_cmd = f'server /data --console-address ":{minio_params.minio_port}"' + docker_client = docker.from_env() + kargs = {} + if host_net: + kargs = {"network": "host"} + container_name = "minio_host" + else: + container_name = "minio_docker" + + try: + self.minio_container = docker_client.containers.get(container_name) + return False + except docker.errors.NotFound: + self.minio_container = docker_client.containers.run( + image=minio_image_name, + name=container_name, + detach=True, + environment=[ + "MINIO_ROOT_PASSWORD=minioadmin", + "MINIO_ROOT_USER=minioadmin", + ], + volumes=[f"{minio_params.minio_dir}:/data"], + command=minio_cmd, + **kargs, + ) + return True + def init_standalone(self, node_name: str): if node_name not in self.runners: raise ValueError(f"Node {node_name} not found in the runners.") diff --git a/python/test_cluster/mocked_infinity_cluster.py b/python/test_cluster/mocked_infinity_cluster.py index 310a10aa11..8dd3f36b5d 100644 --- a/python/test_cluster/mocked_infinity_cluster.py +++ b/python/test_cluster/mocked_infinity_cluster.py @@ -3,17 +3,16 @@ import platform import subprocess import sys -from infinity_cluster import InfinityRunner, InfinityCluster +from infinity_cluster import InfinityRunner, InfinityCluster, MinioParams, convert_request_to_curl import os current_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(current_dir) if parent_dir not in sys.path: sys.path.insert(0, parent_dir) -from infinity_http import infinity_http +from infinity_http import http_network_util, infinity_http - -class mocked_infinity_http(infinity_http): +class mocked_http_network(http_network_util): def __init__(self, ns_name: str, *args, **kwargs): super().__init__(*args, **kwargs) self.ns_name = ns_name @@ -24,7 +23,7 @@ def request(self, url, method, header={}, data={}): url = self.base_url + url logging.debug("url: " + url) - cmd = mocked_infinity_http.__convert_request_to_curl(method, header, data, url) + cmd = convert_request_to_curl(method, header, data, url) cmd = f"sudo ip netns exec {self.ns_name} {cmd}" print(f"cmd: {cmd}") result = subprocess.run(cmd, shell=True, capture_output=True, text=True) @@ -34,14 +33,6 @@ def raise_exception(self, resp, expect={}): # todo: handle curl exception pass - @staticmethod - def __convert_request_to_curl(method: str, header: dict, data: dict, url: str): - cmd = "curl -X {method} -H {headers} -d '{data}' '{uri}'" - method = method.upper() - headers = " ".join([f"-H '{key}:{value}'" for key, value in header.items()]) - data = json.dumps(data) - return cmd.format(method=method, headers=headers, data=data, uri=url) - class MockedInfinityRunner(InfinityRunner): def __init__( @@ -61,14 +52,14 @@ def init(self, config_path: str | None): raise ValueError("Process is already initialized.") if config_path: self.config_path = config_path - self.__load_config() + self.load_config() run_cmd = f"{self.executable_path} --config={self.config_path} 2>&1" run_cmd = f"sudo ip netns exec {self.ns_name} {run_cmd}" self.process = subprocess.Popen(run_cmd, shell=True) def add_client(self, http_addr: str): - self.client = mocked_infinity_http(self.ns_name, http_addr) + self.client = infinity_http(net=mocked_http_network(self.ns_name, http_addr)) def peer_uri(self): peer_port = self.network_config["peer_port"] @@ -76,19 +67,18 @@ def peer_uri(self): class MockInfinityCluster(InfinityCluster): - def __init__(self, executable_path: str): - super().__init__(executable_path) + def __init__(self, executable_path: str, *, minio_params: MinioParams = None): + super().__init__(executable_path, minio_params=minio_params) self.ns_prefix = "ns" self.bridge_name = "br0" self.mock_ip_prefix = "17.0.0." self.mock_ip_mask = 24 - self.mock_port = 1 self.cur_ip_suffix = 1 self.first_mock_ip = None # for ping test self.__check_prerequisites() self.__prepare_bridge() - def __del__(self): + def clear(self): subprocess.run(f"sudo ip link set {self.bridge_name} down".split()) subprocess.run(f"sudo brctl delbr {self.bridge_name}".split()) for ns_name in self.runners: @@ -97,13 +87,13 @@ def __del__(self): subprocess.run(f"sudo ip link delete {veht_name}".split()) def add_node(self, node_name: str, config_path: str): + if node_name in self.runners: + raise ValueError(f"Node {node_name} already exists in the cluster.") ns_name = f"{self.ns_prefix}_{node_name}" mock_ip = self.__connect_to_bridge(ns_name, ping=False) runner = MockedInfinityRunner( mock_ip, ns_name, node_name, self.executable_path, config_path ) - if node_name in self.runners: - raise ValueError(f"Node {node_name} already exists in the cluster.") self.runners[node_name] = runner def remove_node(self, node_name: str): diff --git a/python/test_cluster/requirements.txt b/python/test_cluster/requirements.txt index 793296639a..5c64b46319 100644 --- a/python/test_cluster/requirements.txt +++ b/python/test_cluster/requirements.txt @@ -1,2 +1,4 @@ tomli~=2.0.2 -psutil~=6.0.0 \ No newline at end of file +psutil~=6.0.0 +docker~=7.1.0 +tomli-w~=1.1.0 diff --git a/python/test_cluster/test_basic.py b/python/test_cluster/test_basic.py index 8ca1b0c455..e180a331d1 100644 --- a/python/test_cluster/test_basic.py +++ b/python/test_cluster/test_basic.py @@ -3,10 +3,11 @@ import pytest from infinity_cluster import InfinityCluster from mocked_infinity_cluster import MockInfinityCluster +from docker_infinity_cluster import DockerInfinityCluster, MinioParams -def test_standalone(infinity_path: str): - cluster = InfinityCluster(infinity_path) +@pytest.mark.usefixtures("skip_if_docker") +def test_standalone(cluster: InfinityCluster): cluster.add_node("test", "conf/pytest_parallel_infinity_conf.toml") cluster.init_standalone("test") test_client = cluster.client("test") @@ -15,10 +16,11 @@ def test_standalone(infinity_path: str): test_client.create_database("db1") test_client.drop_database("db1") + cluster.clear() -@pytest.mark.skip(reason="bug") -def test_0(infinity_path: str): - cluster = InfinityCluster(infinity_path) + +@pytest.mark.usefixtures("skip_if_docker") +def test_0(cluster: InfinityCluster): cluster.add_node("node1", "conf/leader.toml") cluster.add_node("node2", "conf/follower.toml") @@ -30,10 +32,14 @@ def test_0(infinity_path: str): cluster.remove_node("node2") cluster.remove_node("node1") + cluster.clear() + + +@pytest.mark.usefixtures("skip_if_docker") +@pytest.mark.skip(reason="deprecated") +def test_mock(mock_cluster: MockInfinityCluster): + cluster = mock_cluster -@pytest.mark.skip(reason="bug") -def test_mock(infinity_path: str): - cluster = MockInfinityCluster(infinity_path) cluster.add_node("node1", "conf/leader.toml") cluster.add_node("node2", "conf/follower.toml") @@ -51,6 +57,35 @@ def test_mock(infinity_path: str): cluster.remove_node("node2") cluster.remove_node("node1") + cluster.clear() + + +@pytest.mark.usefixtures("skip_if_not_docker") +def test_docker(docker_cluster: DockerInfinityCluster): + cluster = docker_cluster + + cluster.add_node("node1", "conf/leader.toml") + cluster.add_node("node2", "conf/follower.toml") + + print("init nodes") + + cluster.init_leader("node1") + cluster.init_follower("node2") + + time.sleep(1) + + cluster.disconnect("node2") + time.sleep(0.1) + cluster.reconnect("node2") + + res = cluster.client("node1").list_databases() + print(res.db_names) + + time.sleep(1) + + print("remove nodes") + + cluster.clear() ''' tc1: n1: admin @@ -77,4 +112,4 @@ def test_mock(infinity_path: str): show node on n1 & n2; select data t1 on n1 and n2; -''' \ No newline at end of file +''' diff --git a/python/test_cluster/test_insert.py b/python/test_cluster/test_insert.py new file mode 100644 index 0000000000..26b01a5d21 --- /dev/null +++ b/python/test_cluster/test_insert.py @@ -0,0 +1,68 @@ +from numpy import dtype +import pandas as pd +from infinity_cluster import InfinityCluster +from docker_infinity_cluster import DockerInfinityCluster +import pytest +import time +from infinity.errors import ErrorCode + + +class TestInsert: + def __test_inner_1(self, cluster: InfinityCluster): + try: + cluster.add_node("node1", "conf/leader.toml") + cluster.add_node("node2", "conf/follower.toml") + + print("init nodes") + + cluster.init_leader("node1") + cluster.init_follower("node2") + + time.sleep(1) + print("insert in node1") + + infinity1 = cluster.client("node1") + r = infinity1.list_databases() + + table_name = "table1" + db1 = infinity1.get_database("default_db") + table1 = db1.create_table( + table_name, {"c1": {"type": "int"}, "c2": {"type": "vector,4,float"}} + ) + table1.insert([{"c1": 1, "c2": [1.0, 2.0, 3.0, 4.0]}]) + + res_gt = pd.DataFrame( + { + "c1": (1), + "c2": ([[1.0, 2.0, 3.0, 4.0]]), + } + ).astype({"c1": dtype("int32"), "c2": dtype("object")}) + + res = table1.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, res_gt) + + time.sleep(1) + print("select in node2") + + infinity2 = cluster.client("node2") + db2 = infinity2.get_database("default_db") + table2 = db2.get_table(table_name) + res = table2.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, res_gt) + + res = db1.drop_table(table_name) + assert res.error_code == ErrorCode.OK + except Exception as e: + print(e) + cluster.clear() + raise + else: + cluster.clear() + + @pytest.mark.usefixtures("skip_if_docker") + def test_insert_11(self, cluster: InfinityCluster): + self.__test_inner_1(cluster) + + @pytest.mark.usefixtures("skip_if_not_docker") + def test_insert_12(self, docker_cluster: DockerInfinityCluster): + self.__test_inner_1(docker_cluster) diff --git a/scripts/Dockerfile_infinity_builder_centos7 b/scripts/Dockerfile_infinity_builder_centos7 index 67631eaa3c..a35873998c 100644 --- a/scripts/Dockerfile_infinity_builder_centos7 +++ b/scripts/Dockerfile_infinity_builder_centos7 @@ -35,7 +35,7 @@ RUN --mount=type=bind,source=gcc-13.2.0.tar.xz,target=/root/gcc-13.2.0.tar.xz \ && cd /root && mkdir build-gcc && cd build-gcc \ && ../gcc-13.2.0/configure --enable-languages=c,c++ \ --disable-multilib --with-pic \ - && make -j12 && make install-strip \ + && make -j16 && make install-strip \ && cd /root && rm -rf build-gcc && rm -rf gcc-13.2.0 \ && ln -s gcc /usr/local/bin/cc && ldconfig @@ -240,4 +240,9 @@ RUN git clone --single-branch --depth=1 https://github.com/VectorCamp/vectorscan && make -j 12 install \ && rm -rf ../../vectorscan +# Install Docker +# https://docs.docker.com/engine/install/centos/#install-using-the-convenience-script +RUN cd /root && curl -fsSL https://get.docker.com -o get-docker.sh \ + && sh get-docker.sh + ENTRYPOINT [ "bash", "-c", "while true; do sleep 60; done"] diff --git a/scripts/timeout_kill.sh b/scripts/timeout_kill.sh index c9a952667b..81c1baee53 100644 --- a/scripts/timeout_kill.sh +++ b/scripts/timeout_kill.sh @@ -7,7 +7,11 @@ if [ $# -lt 1 ]; then fi DURATION=$1 - +# Check if no PIDs are provided +if [ $# -eq 1 ]; then + echo "No PIDs provided. Exiting." + exit 0 +fi # kill all infinity process for pid in "${@:2}"; do # Send SIGTERM diff --git a/src/storage/storage.cpp b/src/storage/storage.cpp index 81d5fb727d..8d352b99d2 100644 --- a/src/storage/storage.cpp +++ b/src/storage/storage.cpp @@ -128,6 +128,7 @@ void Storage::SetStorageMode(StorageMode target_mode) { if (VirtualStore::IsInit()) { UnrecoverableError("remote storage system was initialized before."); } + LOG_INFO(fmt::format("Init remote store url: {}", config_ptr_->ObjectStorageUrl())); Status status = VirtualStore::InitRemoteStore(StorageType::kMinio, config_ptr_->ObjectStorageUrl(), config_ptr_->ObjectStorageHttps(), diff --git a/tools/run_cluster_test.py b/tools/run_cluster_test.py index 9c0dea870e..ac73f7a393 100644 --- a/tools/run_cluster_test.py +++ b/tools/run_cluster_test.py @@ -12,27 +12,42 @@ type=str, default="./build/Debug/src/infinity", ) + parser.add_argument( + "--docker", + action="store_true", + default=False, + ) args = parser.parse_args() infinity_path = args.infinity_path + docker = args.docker current_path = os.getcwd() python_test_dir = current_path + "/python" - process = subprocess.Popen( - [ - python_executable, - "-m", - "pytest", - f"{python_test_dir}/test_cluster", - f"--infinity_path={infinity_path}", - "-x", - "-s", - "-m", - "not slow", - ] - ) + cmd = [ + python_executable, + "-m", + "pytest", + f"{python_test_dir}/test_cluster", + f"--infinity_path={infinity_path}", + "-x", + "-s", + "-m", + "not slow", + ] + if docker: + cmd.append("--docker") + process = subprocess.Popen(cmd) process.wait() + + cmd = [ + python_executable, + f"{python_test_dir}/test_cluster/clear_docker.py", + ] + if docker: + cmd.append("--docker") + process2 = subprocess.run(cmd) if process.returncode != 0: print(f"An error occurred: {process.stderr}") sys.exit(-1)