diff --git a/.github/workflows/slow_test.yml b/.github/workflows/slow_test.yml index 40b126dc8b..68be1e0181 100644 --- a/.github/workflows/slow_test.yml +++ b/.github/workflows/slow_test.yml @@ -1,12 +1,21 @@ name: slow_tests on: + # Schedule the workflow to run at 00:30 UTC+8 every day + # https://docs.github.com/zh/actions/writing-workflows/choosing-when-your-workflow-runs/events-that-trigger-workflows#schedule + schedule: + - cron: '30 16 * * *' # 00:30 - 8 = 16:30 # The "create tags" trigger is specifically focused on the creation of new tags, while the "push tags" trigger is activated when tags are pushed, including both new tag creations and updates to existing tags. create: tags: - "v*.*.*" # normal release - "nightly" # mutable tag - "slow-test" # mutable tag + pull_request: + types: [ opened, synchronize, reopened, labeled ] + paths-ignore: + - 'docs/**' + - '*.md' # https://docs.github.com/en/actions/using-jobs/using-concurrency concurrency: @@ -16,6 +25,7 @@ concurrency: jobs: slow_tests: name: run slow test + if: ${{ github.event_name != 'pull_request' || contains(github.event.pull_request.labels.*.name, 'invalid') }} runs-on: ["self-hosted", "slow-test" ] steps: @@ -47,32 +57,62 @@ jobs: - name: Build release version if: ${{ !cancelled() && !failure() }} - run: sudo docker exec ${BUILDER_CONTAINER} bash -c "git config --global safe.directory \"*\" && cd /infinity && rm -fr cmake-build-release && mkdir -p cmake-build-release && cmake -G Ninja -DCMAKE_BUILD_TYPE=RelWithDebInfo -DCMAKE_JOB_POOLS:STRING=link=8 -S /infinity -B /infinity/cmake-build-release && cmake --build /infinity/cmake-build-release --target infinity test_main knn_import_benchmark knn_query_benchmark" + run: sudo docker exec ${BUILDER_CONTAINER} bash -c "git config --global safe.directory \"*\" && cd /infinity && rm -fr cmake-build-release && mkdir -p cmake-build-release && cmake -G Ninja -DCMAKE_BUILD_TYPE=RelWithDebInfo -DCMAKE_JOB_POOLS:STRING=link=8 -S /infinity -B /infinity/cmake-build-release && cmake --build /infinity/cmake-build-release --target infinity" - name: Install pysdk for Python 3.10 if: ${{ !cancelled() && !failure() }} - run: sudo docker exec ${BUILDER_CONTAINER} bash -c "rm -rf /root/.config/pip/pip.conf && cd /infinity/ && pip3 uninstall -y infinity-sdk && cd python/infinity_sdk/ && pip3 install . -v --config-settings=cmake.build-type='RelWithDebInfo' --config-settings=build-dir='cmake-build-release' -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn && cd ../.." + run: | + sudo docker exec ${BUILDER_CONTAINER} bash -c "rm -rf /root/.config/pip/pip.conf && cd /infinity/ && pip3 uninstall -y infinity-sdk && cd python/infinity_sdk/ && pip3 install . -v --config-settings=cmake.build-type='RelWithDebInfo' --config-settings=build-dir='cmake-build-release' -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn && cd ../.." + sudo docker exec ${BUILDER_CONTAINER} bash -c "rm -rf /root/.config/pip/pip.conf && cd /infinity/ && pip3 uninstall -y infinity-embedded-sdk && pip3 install . -v --config-settings=cmake.build-type='RelWithDebInfo' --config-settings=build-dir='cmake-build-release' -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn" - - name: Start infinity release version + - name: Prepare restart test data + if: ${{ !cancelled() && !failure() }} + run: | + RUNNER_WORKSPACE_PREFIX=${RUNNER_WORKSPACE_PREFIX:-$HOME} + echo "RUNNER_WORKSPACE_PREFIX=${RUNNER_WORKSPACE_PREFIX}" >> $GITHUB_ENV + sudo python3 scripts/prepare_restart_test_data.py --from_dir=${RUNNER_WORKSPACE_PREFIX} --op=add + + - name: Run restart test if: ${{ !cancelled() && !failure() }} + id: run_restart_test + run : | + sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && pip3 install -r python/restart_test/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn" + sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && python3 tools/run_restart_test_continuously.py --infinity_path=cmake-build-release/src/infinity" + # sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && pytest python/restart_test/test_insert.py -k "test_data[infinity_runner0-columns5-gen-1000000-test/data/config/restart_test/test_insert/1.toml]" -s --infinity_path=cmake-build-release/src/infinity" + # sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && pytest python/restart_test/test_insert.py -k "test_index[infinity_runner0-columns2-indexes2-gen-1000000-test/data/config/restart_test/test_insert/1.toml]" -s --infinity_path=cmake-build-release/src/infinity" + + - name: Collect restart test output + if: ${{ !cancelled() }} # always run this step even if previous steps failed + # remove symbolic link + # find all log file like [debug.log.*] in directory, and cat to stdout + run: | + sudo python3 scripts/prepare_restart_test_data.py --op=remove + failure="${{ steps.run_restart_test.outcome == 'failure'}}" + sudo python3 scripts/collect_restart_log.py --executable_path=cmake-build-release/src/infinity --output_dir=${RUNNER_WORKSPACE_PREFIX}/log --failure=${failure} + + - name: Start infinity release version + if: ${{ !cancelled() && !failure()}} + # && !contains(github.event.pull_request.labels.*.name, 'invalid') run: | # Run a command in the background sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && rm -fr /var/infinity && cmake-build-release/src/infinity --config=conf/pytest_parallel_continuous_conf.toml > release.log 2> release_error.log" & - name: Run pysdk remote infinity & parallel & http_api & sqllogic test release version continously - if: ${{ !cancelled() && !failure() }} + if: ${{ !cancelled() && !failure()}} + # && !contains(github.event.pull_request.labels.*.name, 'invalid') id: run_py_tests run: sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && python3 tools/run_pytest_parallel_continuous.py" && sleep 1s - name: Stop infinity release - if: ${{ !cancelled() }} + if: ${{ !cancelled() && !failure()}} + # && !contains(github.event.pull_request.labels.*.name, 'invalid') id: stop_py_tests run: | pids=$(sudo docker exec ${BUILDER_CONTAINER} pgrep -f cmake-build-release/src/infinity | xargs echo) sudo chmod +x scripts/timeout_kill.sh sudo docker exec ${BUILDER_CONTAINER} bash -c "/infinity/scripts/timeout_kill.sh 10 ${pids}" if [ $? -ne 0 ]; then - echo "Failed to kill infinity debug version" + echo "Failed to kill infinity release version" exit 1 fi @@ -82,29 +122,6 @@ jobs: failure="${{ steps.run_py_tests.outcome == 'failure' || steps.stop_py_tests.outcome == 'failure' }}" sudo python3 scripts/collect_log.py --log_path=/var/infinity/log/infinity.log --stdout_path=release.log --stderror_path=release_error.log --executable_path=cmake-build-release/src/infinity --output_dir=${RUNNER_WORKSPACE_PREFIX}/log --failure=${failure} - - name: Prepare restart test data - if: ${{ !cancelled() && !failure() }} - run: | - RUNNER_WORKSPACE_PREFIX=${RUNNER_WORKSPACE_PREFIX:-$HOME} - echo "RUNNER_WORKSPACE_PREFIX=${RUNNER_WORKSPACE_PREFIX}" >> $GITHUB_ENV - touch tmp.txt && echo "${RUNNER_WORKSPACE_PREFIX}" > tmp.txt - sudo mkdir -p test/data/benchmark && sudo ln -sf ${RUNNER_WORKSPACE_PREFIX}/benchmark/enwiki/enwiki-10w.csv test/data/benchmark/enwiki-10w.csv - - - name: Run restart test - if: ${{ !cancelled() && !failure() }} - run : | - sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && pip3 install -r python/restart_test/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn" - sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && pytest python/restart_test/test_insert.py -k "test_data[infinity_runner0-columns5-gen-1000000-test/data/config/restart_test/test_insert/1.toml]" -s --infinity_path=cmake-build-release/src/infinity" - sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && pytest python/restart_test/test_insert.py -k "test_index[infinity_runner0-columns2-indexes2-gen-1000000-test/data/config/restart_test/test_insert/1.toml]" -s --infinity_path=cmake-build-release/src/infinity" - - - name: Collect restart test output - if: ${{ !cancelled() }} # always run this step even if previous steps failed - # remove symbolic link - # find all log file like [debug.log.*] in directory, and cat to stdout - run: | - sudo rm -f test/data/benchmark/enwiki-10w.csv - find . -name "restart_test.log.*" -exec cat {} \; - - name: Destroy builder container if: always() # always run this step even if previous steps failed run: | diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 852123317a..763de7055c 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -69,20 +69,17 @@ jobs: run: | RUNNER_WORKSPACE_PREFIX=${RUNNER_WORKSPACE_PREFIX:-$HOME} echo "RUNNER_WORKSPACE_PREFIX=${RUNNER_WORKSPACE_PREFIX}" >> $GITHUB_ENV - touch tmp.txt && echo "${RUNNER_WORKSPACE_PREFIX}" > tmp.txt - sudo mkdir -p test/data/benchmark && sudo ln -sf ${RUNNER_WORKSPACE_PREFIX}/benchmark/enwiki/enwiki-10w.csv test/data/benchmark/enwiki-10w.csv - name: Run restart test if: ${{ !cancelled() && !failure() }} + id: run_restart_test run : sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && pip3 install -r python/restart_test/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn && LD_PRELOAD=/usr/local/lib/clang/18/lib/x86_64-unknown-linux-gnu/libclang_rt.asan.so ASAN_OPTIONS=detect_leaks=0 python3 tools/run_restart_test.py --infinity_path=cmake-build-debug/src/infinity" - name: Collect restart test output if: ${{ !cancelled() }} # always run this step even if previous steps failed - # remove symbolic link - # find all log file like [debug.log.*] in directory, and cat to stdout run: | - sudo rm -f test/data/benchmark/enwiki-10w.csv - find . -name "restart_test.log.*" -exec cat {} \; + failure="${{ steps.run_restart_test.outcome == 'failure'}}" + sudo python3 scripts/collect_restart_log.py --executable_path=cmake-build-debug/src/infinity --output_dir=${RUNNER_WORKSPACE_PREFIX}/log --failure=${failure} - name: Start infinity debug version if: ${{ !cancelled() && !failure() }} @@ -208,20 +205,17 @@ jobs: run: | RUNNER_WORKSPACE_PREFIX=${RUNNER_WORKSPACE_PREFIX:-$HOME} echo "RUNNER_WORKSPACE_PREFIX=${RUNNER_WORKSPACE_PREFIX}" >> $GITHUB_ENV - touch tmp.txt && echo "${RUNNER_WORKSPACE_PREFIX}" > tmp.txt - sudo mkdir -p test/data/benchmark && sudo ln -sf ${RUNNER_WORKSPACE_PREFIX}/benchmark/enwiki/enwiki-10w.csv test/data/benchmark/enwiki-10w.csv - name: Run restart test if: ${{ !cancelled() && !failure() }} + id: run_restart_test run : sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && pip3 install -r python/restart_test/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host tuna.tsinghua.edu.cn && python3 tools/run_restart_test.py --infinity_path=cmake-build-release/src/infinity" - name: Collect restart test output if: ${{ !cancelled() }} # always run this step even if previous steps failed - # remove symbolic link - # find all log file like [debug.log.*] in directory, and cat to stdout run: | - sudo rm -f test/data/benchmark/enwiki-10w.csv - find . -name "restart_test.log.*" -exec cat {} \; + failure="${{ steps.run_restart_test.outcome == 'failure'}}" + sudo python3 scripts/collect_restart_log.py --executable_path=cmake-build-release/src/infinity --output_dir=${RUNNER_WORKSPACE_PREFIX}/log --failure=${failure} - name: Test embedded infinity for Python 3.10 if: ${{ !cancelled() && !failure() }} diff --git a/python/restart_test/conftest.py b/python/restart_test/conftest.py index 79bc4435b7..2b5a30be91 100644 --- a/python/restart_test/conftest.py +++ b/python/restart_test/conftest.py @@ -1,4 +1,5 @@ from infinity_runner import InfinityRunner +from restart_timeout import * def pytest_addoption(parser): @@ -19,3 +20,10 @@ def pytest_generate_tests(metafunc): runner = InfinityRunner(infinity_path) metafunc.parametrize("infinity_runner", [runner]) pass + + +# def pytest_collection_modifyitems(config, items): +# for item in items: +# # Apply the decorator to each test function +# test_name = item.name +# item.obj = my_timeout(test_name)(item.obj) diff --git a/python/restart_test/infinity_runner.py b/python/restart_test/infinity_runner.py index 86143f0340..bef3970297 100644 --- a/python/restart_test/infinity_runner.py +++ b/python/restart_test/infinity_runner.py @@ -13,6 +13,10 @@ def __init__(self, infinity_path: str): self.default_config_path = "./conf/infinity_conf.toml" self.script_path = "./scripts/timeout_kill.sh" self.infinity_path = infinity_path + + if not os.access(self.infinity_path, os.X_OK): + raise Exception(f"{self.infinity_path} is not executable.") + self.i = 0 def clear(self): @@ -21,15 +25,23 @@ def clear(self): ) os.system(f"rm -rf restart_test.log.*") print(f"clear {self.data_dir}") + self.i = 0 def init(self, config_path: str | None = None): + init_timeout = 60 if config_path is None: config_path = self.default_config_path cmd = f"{self.infinity_path} --config={config_path} > restart_test.log.{self.i} 2>&1" - pids = [proc.pid for proc in psutil.process_iter(['pid', 'name']) if "infinity" in proc.info['name']] + pids = [ + proc.pid + for proc in psutil.process_iter(["pid", "name"]) + if "infinity" in proc.info["name"] + ] if len(pids) > 0: - ret = os.system(f"bash {self.script_path} 30 {' '.join(map(str, pids))}") + ret = os.system( + f"bash {self.script_path} {init_timeout} {' '.join(map(str, pids))}" + ) if ret != 0: raise Exception("An error occurred.") @@ -41,7 +53,9 @@ def init(self, config_path: str | None = None): self.i += 1 def uninit(self): - timeout = 30 + if self.process is None: + return + timeout = 60 pids = [] for child in psutil.Process(self.process.pid).children(recursive=True): pids.append(child.pid) @@ -50,6 +64,10 @@ def uninit(self): ret = os.system(f"bash {self.script_path} {timeout} {' '.join(map(str, pids))}") if ret != 0: raise Exception("An error occurred.") + self.process = None + + def connected(self): + return self.process is not None @staticmethod def connect(uri: str): @@ -71,7 +89,7 @@ def decorator(f): def wrapper(*args, **kwargs): infinity_runner.init(config_path) infinity_obj = InfinityRunner.connect(uri) - try : + try: f(infinity_obj, *args, **kwargs) finally: infinity_obj.disconnect() diff --git a/python/restart_test/requirements.txt b/python/restart_test/requirements.txt index a138033cad..0769c60dcd 100644 --- a/python/restart_test/requirements.txt +++ b/python/restart_test/requirements.txt @@ -1 +1,2 @@ -psutil~=6.0.0 \ No newline at end of file +psutil~=6.0.0 +timeout-decorator~=0.5.0 diff --git a/python/restart_test/restart_timeout.py b/python/restart_test/restart_timeout.py new file mode 100644 index 0000000000..0de71ccba7 --- /dev/null +++ b/python/restart_test/restart_timeout.py @@ -0,0 +1,31 @@ +import timeout_decorator + +# Description: use timeout_decorator as backup solution if a test not finish in time + +ONE_TEST_TIMEOUT = 60 * 60 # 1 hour + + +class TimeoutException(Exception): + def __init__(self, name): + super().__init__(name) + print(f"Timeout decorator for {name}") + self.timeout_name = name + + def __str__(self): + return f"Timeout: run {self.timeout_name} exceed {ONE_TEST_TIMEOUT} seconds" + + +# wrap timeout_decorator with time and exception +def my_timeout(timeout_name): + def wrapper(func): + def inner(*args, **kwargs): + try: + return timeout_decorator.timeout(ONE_TEST_TIMEOUT, use_signals=False)( + func + )(*args, **kwargs) + except timeout_decorator.timeout_decorator.TimeoutError: + raise TimeoutException(timeout_name) + + return inner + + return wrapper diff --git a/python/restart_test/test_fulltext.py b/python/restart_test/test_fulltext.py index 1f0bfe303b..3dbeb90c2e 100644 --- a/python/restart_test/test_fulltext.py +++ b/python/restart_test/test_fulltext.py @@ -11,19 +11,20 @@ from infinity.errors import ErrorCode from restart_util import * + @pytest.mark.slow class TestFullText: - @pytest.mark.skip(reason="not tested") @pytest.mark.parametrize( "config", [ "test/data/config/restart_test/test_fulltext/1.toml", - # "test/data/config/restart_test/test_fulltext/2.toml", - # "test/data/config/restart_test/test_fulltext/3.toml", + "test/data/config/restart_test/test_fulltext/2.toml", + "test/data/config/restart_test/test_fulltext/3.toml", ], ) def test_fulltext(self, infinity_runner: InfinityRunner, config: str): - enwiki_path = "test/data/csv/enwiki-10w.csv" + # should add symbolic link in advance + enwiki_path = "test/data/benchmark/enwiki-10w.csv" enwiki_size = 100000 table_name = "test_fulltext" @@ -59,8 +60,8 @@ def test_fulltext(self, infinity_runner: InfinityRunner, config: str): }, ConflictType.Error, ) - enwiki_gen1 = EnwikiGenerator.gen_factory(enwiki_path) - for id, (title, date, body) in enumerate(enwiki_gen1(enwiki_size)): + enwiki_gen1 = EnwikiGenerator.gen_factory(enwiki_path)(enwiki_size) + for id, (title, date, body) in enumerate(enwiki_gen1): gt_table_obj.insert( [{"id": id, "title": title, "date": date, "body": body}] ) @@ -83,13 +84,13 @@ def test_fulltext(self, infinity_runner: InfinityRunner, config: str): infinity_runner.uninit() cur_insert_n = 0 - enwiki_gen2 = EnwikiGenerator.gen_factory(enwiki_path) + enwiki_gen2 = EnwikiGenerator.gen_factory(enwiki_path)(enwiki_size) end = False + qu = queue.Queue() def work_func(table_obj, gt_table_obj): - nonlocal cur_insert_n - nonlocal end - while True: + nonlocal cur_insert_n, end, qu + while not end: r = random.randint(0, 500 - 1) try: if r < 499: @@ -111,12 +112,21 @@ def work_func(table_obj, gt_table_obj): ) cur_insert_n += 1 else: - print(f"search at {cur_insert_n}") + gt_res = ( + gt_table_obj.output(["id"]) + .match_text( + fields="body", + matching_text=matching_text, + topn=10, + extra_options=None, + ) + .filter("id<{}".format(cur_insert_n)) + .to_result() + ) - result_queue = queue.Queue() + def t1(): + print(f"search at {cur_insert_n}") - def wait_and_search(): - time.sleep(search_after_insert) res = ( table_obj.output(["id"]) .match_text( @@ -127,46 +137,43 @@ def wait_and_search(): ) .to_result() ) - result_queue.put(res) - - t1 = threading.Thread(target=wait_and_search) - t1.start() - gt_res = ( - gt_table_obj.output(["id"]) - .match_text( - fields="body", - matching_text=matching_text, - topn=10, - extra_options=None, - ) - .filter("id<{}".format(cur_insert_n)) - .to_result() - ) - t1.join() - res = result_queue.get() + data_dict, _ = res + gt_data_dict, _ = gt_res + if data_dict != gt_data_dict: + print(f"diff: {data_dict} {gt_data_dict}") + else: + print("same") + search_time = time.time() + search_after_insert + qu.put((t1, search_time)) - data_dict, _ = res - gt_data_dict, _ = gt_res - assert data_dict == gt_data_dict - print(data_dict) + except Exception as e: + print(e) + assert shutdown + break + def search_thread(): + nonlocal qu, shutdown + while True: + (f, search_time) = qu.get() + while time.time() < search_time: + time.sleep(0.1) + try: + f() except Exception as e: print(e) - assert shutdown == True + assert shutdown break def shutdown_func(): nonlocal shutdown - shutdown = True + shutdown = False time.sleep(shutdown_interval) + shutdown = True infinity_runner.uninit() print(f"cur_insert_n: {cur_insert_n}") print("shutdown infinity") - assert shutdown == False - shutdown = True - return while not end: infinity_runner.init(config) @@ -178,7 +185,13 @@ def shutdown_func(): t1 = threading.Thread(target=shutdown_func) t1.start() + + t2 = threading.Thread(target=search_thread) + t2.start() + work_func(table_obj, gt_table_obj) + + t2.join() t1.join() infinity_runner.uninit() diff --git a/python/restart_test/test_insert.py b/python/restart_test/test_insert.py index 0e0240baa2..9adf8f40fc 100644 --- a/python/restart_test/test_insert.py +++ b/python/restart_test/test_insert.py @@ -24,15 +24,16 @@ def insert_inner( uri = common_values.TEST_LOCAL_HOST cur_insert_n = 0 - gen_finished = False + shutdown = False + error = False def insert_func(table_obj): - nonlocal cur_insert_n, gen_finished + nonlocal cur_insert_n, shutdown, error batch_size = 10 - while cur_insert_n < insert_n and not gen_finished: + while cur_insert_n < insert_n: + insert_data = [] try: - insert_data = [] # get `batch_size` data in data_gen one time for i in range(batch_size): try: @@ -42,33 +43,39 @@ def insert_func(table_obj): data_line[col_name] = col_data insert_data.append(data_line) except StopIteration: - gen_finished = True break - table_obj.insert(insert_data) + if len(insert_data) > 0: + table_obj.insert(insert_data) + else: + cur_insert_n = insert_n except Exception as e: + print(f"insert error at {cur_insert_n}") + if not shutdown: + error = True + raise e break - cur_insert_n += batch_size - if cur_insert_n == insert_n: - gen_finished = True + cur_insert_n += len(insert_data) shutdown_time = 0 def shutdown_func(): - nonlocal cur_insert_n, shutdown_time + nonlocal cur_insert_n, shutdown_time, shutdown, error + shutdown = False last_shutdown_insert_n = cur_insert_n - while True: - if gen_finished or ( + while not error: + if cur_insert_n >= insert_n or ( stop_n != 0 and cur_insert_n - last_shutdown_insert_n >= insert_n // stop_n ): + shutdown = True infinity_runner.uninit() print("shutdown infinity") shutdown_time += 1 return - print(f"cur_insert_n: {cur_insert_n}") + print(f"cur_insert_n inner: {cur_insert_n}") time.sleep(0.1) - while not gen_finished: + while cur_insert_n < insert_n: infinity_runner.init(config) infinity_obj = InfinityRunner.connect(uri) @@ -89,7 +96,7 @@ def shutdown_func(): "insert_n, config", [ (100000, "test/data/config/restart_test/test_insert/1.toml"), - (1000000, "test/data/config/restart_test/test_insert/1.toml"), + # (1000000, "test/data/config/restart_test/test_insert/1.toml"), ], ) @pytest.mark.parametrize( @@ -155,7 +162,7 @@ def test_data( "insert_n,config", [ (100000, "test/data/config/restart_test/test_insert/1.toml"), - (1000000, "test/data/config/restart_test/test_insert/1.toml"), + # (1000000, "test/data/config/restart_test/test_insert/1.toml"), ], ) @pytest.mark.parametrize( diff --git a/python/restart_test/test_insert_import.py b/python/restart_test/test_insert_import.py index a84042ffd8..192d2c5893 100644 --- a/python/restart_test/test_insert_import.py +++ b/python/restart_test/test_insert_import.py @@ -26,25 +26,27 @@ def insert_import_inner( data_gen = data_gen_factory(total_n) stop_n = 10 + interval_n = total_n // stop_n uri = common_values.TEST_LOCAL_HOST + insert_batch_size = 10 + import_rate = insert_batch_size / (import_size + insert_batch_size) + cur_n = 0 - gen_finished = False insert_finish = False + shutdown = False + error = False def insert_import_func(table_obj): - nonlocal cur_n, gen_finished, insert_finish - insert_batch_size = 10 - - import_rate = insert_batch_size / (import_size + insert_batch_size) + nonlocal cur_n, insert_finish, shutdown, error - while cur_n < total_n and not gen_finished: + while cur_n < total_n: r = random.randint(0, 100) if_import = r < import_rate * 100 if insert_finish: print("insert finished") - if not if_import and not insert_finish: - try: + try: + if not if_import and not insert_finish: insert_data = [] # get `batch_size` data in data_gen one time for i in range(insert_batch_size): @@ -57,37 +59,41 @@ def insert_import_func(table_obj): except StopIteration: insert_finish = True break - table_obj.insert(insert_data) - except Exception as e: - break - cur_n += insert_batch_size - else: - try: + if len(insert_data) > 0: + table_obj.insert(insert_data) + else: + cur_n = total_n + cur_n += insert_batch_size + else: abs_import_file = os.path.abspath(import_file) table_obj.import_data(abs_import_file, import_options) - except Exception as e: - break - cur_n += import_size - if cur_n >= total_n: - gen_finished = True + cur_n += import_size + except Exception as e: + print(f"insert/import {if_import} error at {cur_n}") + if not shutdown: + error = True + raise e + break shutdown_time = 0 def shutdown_func(): - nonlocal cur_n, shutdown_time + nonlocal cur_n, shutdown_time, shutdown, error + shutdown = False last_shutdown_n = cur_n - while True: - if gen_finished or ( - stop_n != 0 and cur_n - last_shutdown_n >= total_n // stop_n + while not error: + if cur_n >= total_n or ( + stop_n != 0 and cur_n - last_shutdown_n >= interval_n ): + shutdown = True infinity_runner.uninit() print("shutdown infinity") shutdown_time += 1 return - print(f"cur_n: {cur_n}") + print(f"cur_n inner: {cur_n}") time.sleep(0.1) - while not gen_finished: + while cur_n < total_n: infinity_runner.init(config) infinity_obj = InfinityRunner.connect(uri) diff --git a/python/restart_test/test_shutdown_pytest.py b/python/restart_test/test_shutdown_pytest.py index 5545983a53..ec988e4d23 100644 --- a/python/restart_test/test_shutdown_pytest.py +++ b/python/restart_test/test_shutdown_pytest.py @@ -10,6 +10,7 @@ @pytest.mark.slow +@pytest.mark.skip(reason="skip") class TestShutDownPytest: finish_one = False diff --git a/scripts/collect_restart_log.py b/scripts/collect_restart_log.py new file mode 100644 index 0000000000..0036efd2b0 --- /dev/null +++ b/scripts/collect_restart_log.py @@ -0,0 +1,75 @@ +import argparse +import os +import random +import re +import shutil +import string +import glob + + +parser = argparse.ArgumentParser(description="Python restart test for infinity") +parser.add_argument( + "--executable_path", + type=str, + required=True, + help="Path to the executable file", +) +parser.add_argument( + "--output_dir", + type=str, + required=True, + help="Path to the output directory", +) +parser.add_argument("--failure", type=str, required=True, help="If the test failured") +args = parser.parse_args() + +executable_path = args.executable_path +output_dir = args.output_dir +failure = args.failure == "true" or args.failure == "True" + +if not os.path.isdir(output_dir): + os.makedirs(output_dir) + +if failure: + random_name = "".join(random.choices(string.ascii_lowercase + string.digits, k=8)) + print(f"Random log file name: {random_name}") + +show_lines = 1000 + + +log_files = [] +for root, dirs, files in os.walk("."): + for file in files: + match = re.match(r"restart_test\.log\.(\d+)", file) + if match: + log_files.append((os.path.join(root, file), int(match.group(1)))) + +log_files = sorted(log_files, key=lambda x: x[1]) +print(f"Found log files: {log_files}") + +for log_file, i in log_files: + if failure: + shutil.copy(log_file, f"{output_dir}/{random_name}_{i}.log") + if i == len(log_files) - 1: + print(f"Last {show_lines} lines from {log_file}:") + with open(log_file, "r") as f: + lines = f.readlines() + for line in lines[-show_lines:]: + print(line.strip()) + +if not os.path.isfile(executable_path): + print("Error: Executable file not found") +else: + if failure: + shutil.copy(executable_path, f"{output_dir}/{random_name}.exe") + +if failure: + # copy file in /var/infinity/../FULL.*.json + copy_n = 0 + for filepath in glob.iglob(f"/var/infinity/**/FULL.*.json", recursive=True): + print(filepath) + filename = filepath.split("/")[-1] + shutil.copy(filepath, f"./{filename}") + copy_n += 1 + if copy_n == 0: + print("No FULL.*.json file found") diff --git a/scripts/prepare_restart_test_data.py b/scripts/prepare_restart_test_data.py new file mode 100644 index 0000000000..aced726062 --- /dev/null +++ b/scripts/prepare_restart_test_data.py @@ -0,0 +1,50 @@ +import argparse +import os + +filepath_pair = [ + ("benchmark/enwiki/enwiki-10w.csv", "test/data/benchmark/enwiki-10w.csv"), + ("benchmark/sift1M/sift_base.fvecs", "test/data/benchmark/sift_1m/sift_base.fvecs"), +] + +parser = argparse.ArgumentParser(description="Prepare Restart Test Data") +parser.add_argument( + "--from_dir", + type=str, + required=False, + dest="from_dir", +) +parser.add_argument( + "--op", + type=str, + required=True, + dest="op", +) + +add = True + +args = parser.parse_args() +from_dir = args.from_dir +op = args.op +if op == "add": + add = True + if from_dir is None: + raise Exception("from_dir is required for add operation") +elif op == "remove": + add = False +else: + raise Exception("Invalid operation") + +if add: + for from_file, to_file in filepath_pair: + from_path = f"{from_dir}/{from_file}" + to_dir = os.path.dirname(to_file) + if not os.path.exists(to_dir): + os.makedirs(to_dir) + print(f"rm -f {to_file}") + os.system(f"rm -f {to_file}") + print(f"cp {from_path} {to_file}") + os.system(f"cp {from_path} {to_file}") +else: + for from_file, to_file in filepath_pair: + print(f"rm -f {to_file}") + os.system(f"rm -f {to_file}") diff --git a/src/bin/infinity_main.cpp b/src/bin/infinity_main.cpp index eafbe99448..5c67d2ecd8 100644 --- a/src/bin/infinity_main.cpp +++ b/src/bin/infinity_main.cpp @@ -193,12 +193,12 @@ auto main(int argc, char **argv) -> int { #elif THRIFT_SERVER_TYPE == 1 i32 thrift_server_pool_size = InfinityContext::instance().config()->ConnectionPoolSize(); - non_block_pool_thrift_server.Init(thrift_server_port, thrift_server_pool_size); + non_block_pool_thrift_server.Init(InfinityContext::instance().config()->ServerAddress(), thrift_server_port, thrift_server_pool_size); non_block_pool_thrift_thread = infinity::Thread([&]() { non_block_pool_thrift_server.Start(); }); #else - threaded_thrift_server.Init(thrift_server_port); + threaded_thrift_server.Init(InfinityContext::instance().config()->ServerAddress(), thrift_server_port); threaded_thrift_thread = infinity::Thread([&]() { threaded_thrift_server.Start(); }); #endif diff --git a/src/main/config.cpp b/src/main/config.cpp index 3e7517bf6d..4f2828cea6 100644 --- a/src/main/config.cpp +++ b/src/main/config.cpp @@ -115,6 +115,11 @@ Status Config::ParseTimeInfo(const String &time_info, i64 &time_seconds) { Status Config::Init(const SharedPtr &config_path, DefaultConfig *default_config) { toml::table config_toml{}; + if (config_path.get() != nullptr) { + LOG_INFO(fmt::format("Config file: {}", *config_path)); + } else { + LOG_INFO("No config file is given, use default configs."); + } if (config_path.get() == nullptr || config_path->empty() || !VirtualStore::Exists(std::filesystem::absolute(*config_path))) { if (config_path.get() == nullptr || config_path->empty()) { fmt::print("No config file is given, use default configs.\n"); @@ -2377,7 +2382,7 @@ void Config::PrintAll() { fmt::print(" - cleanup_interval: {}\n", Utility::FormatTimeInfo(CleanupInterval())); fmt::print(" - compact_interval: {}\n", Utility::FormatTimeInfo(CompactInterval())); fmt::print(" - optimize_index_interval: {}\n", Utility::FormatTimeInfo(OptimizeIndexInterval())); - fmt::print(" - memindex_capacity: {}\n", Utility::FormatByteSize(MemIndexCapacity())); + fmt::print(" - memindex_capacity: {}\n", MemIndexCapacity()); // mem index capacity is line number fmt::print(" - storage_type: {}\n", ToString(StorageType())); switch(StorageType() ) { case StorageType::kLocal: { diff --git a/src/storage/buffer/file_worker/file_worker.cpp b/src/storage/buffer/file_worker/file_worker.cpp index c562e13697..362f4b62ca 100644 --- a/src/storage/buffer/file_worker/file_worker.cpp +++ b/src/storage/buffer/file_worker/file_worker.cpp @@ -110,8 +110,16 @@ void FileWorker::ReadFromFile(bool from_spill) { handler = PersistResultHandler(persistence_manager_); } String read_path = fmt::format("{}/{}", ChooseFileDir(from_spill), *file_name_); + Optional>> defer_fn; if (use_object_cache) { PersistReadResult result = persistence_manager_->GetObjCache(read_path); + defer_fn.emplace(([&]() { + if (use_object_cache && obj_addr_.Valid()) { + String read_path = fmt::format("{}/{}", ChooseFileDir(from_spill), *file_name_); + PersistWriteResult res = persistence_manager_->PutObjCache(read_path); + handler.HandleWriteResult(res); + } + })); obj_addr_ = handler.HandleReadResult(result); if (!obj_addr_.Valid()) { String error_message = fmt::format("Failed to find object for local path {}", read_path); @@ -131,12 +139,7 @@ void FileWorker::ReadFromFile(bool from_spill) { file_size = file_handle->FileSize(); } file_handle_ = std::move(file_handle); - DeferFn defer_fn([&]() { - if (use_object_cache && obj_addr_.Valid()) { - String read_path = fmt::format("{}/{}", ChooseFileDir(from_spill), *file_name_); - PersistWriteResult res = persistence_manager_->PutObjCache(read_path); - handler.HandleWriteResult(res); - } + DeferFn defer_fn2([&]() { file_handle_ = nullptr; }); ReadFromFileImpl(file_size); diff --git a/src/storage/compaction/DBT_compaction_alg.cpp b/src/storage/compaction/DBT_compaction_alg.cpp index 89066ac5e1..798f44cb60 100644 --- a/src/storage/compaction/DBT_compaction_alg.cpp +++ b/src/storage/compaction/DBT_compaction_alg.cpp @@ -58,6 +58,7 @@ Vector SegmentLayer::PickCompacting(TransactionID txn_id, SizeT { Vector> segments; for (auto &[segment_id, segment_entry] : segments_) { + // LOG_INFO(fmt::format("Get rowcount of segment: {}", segment_id)); segments.emplace_back(segment_entry, segment_entry->actual_row_count()); } Vector idx(segment_n); @@ -124,6 +125,7 @@ Vector DBTCompactionAlg::CheckCompaction(TransactionID txn_id) { for (int layer = cur_layer_n - 1; layer >= 0; --layer) { auto &segment_layer = segment_layers_[layer]; if (segment_layer.LayerSize() >= config_.m_) { + // LOG_INFO(fmt::format("Get rowcount of table {}", *table_entry_->GetTableName())); Vector compact_segments = segment_layer.PickCompacting(txn_id, config_.m_, max_segment_capacity_); if (compact_segments.empty()) { continue; diff --git a/src/storage/compaction_process.cpp b/src/storage/compaction_process.cpp index ff01ab40a1..4328aacf81 100644 --- a/src/storage/compaction_process.cpp +++ b/src/storage/compaction_process.cpp @@ -81,9 +81,14 @@ void CompactionProcessor::DoCompact() { for (const auto &[statement, txn] : statements) { BGQueryContextWrapper wrapper(txn); BGQueryState state; - bool res = wrapper.query_context_->ExecuteBGStatement(statement.get(), state); - if (res) { - wrappers.emplace_back(std::move(wrapper), std::move(state)); + try { + bool res = wrapper.query_context_->ExecuteBGStatement(statement.get(), state); + if (res) { + wrappers.emplace_back(std::move(wrapper), std::move(state)); + } + } catch (const std::exception &e) { + LOG_CRITICAL(fmt::format("DoCompact failed: {}", e.what())); + throw; } } for (auto &[wrapper, query_state] : wrappers) { diff --git a/src/storage/io/s3_client_minio.cpp b/src/storage/io/s3_client_minio.cpp index e65e8318c9..f1e3520fb1 100644 --- a/src/storage/io/s3_client_minio.cpp +++ b/src/storage/io/s3_client_minio.cpp @@ -29,11 +29,12 @@ Status S3ClientMinio::DownloadObject(const String &bucket_name, const String &ob args.filename = file_path; // Call download object. + LOG_INFO(fmt::format("Downloading object {} from {} to {}", object_name, bucket_name, file_path)); minio::s3::DownloadObjectResponse resp = client_->DownloadObject(args); // Handle response. if (resp) { - LOG_TRACE(fmt::format("{} downloaded to {} successfully", file_path, object_name)); + LOG_INFO(fmt::format("{} downloaded to {} successfully", file_path, object_name)); } else { UnrecoverableError("unable to download object; " + resp.Error().String()); } @@ -48,11 +49,12 @@ Status S3ClientMinio::UploadObject(const String &bucket_name, const String &obje args.filename = file_path; // Call upload object. + LOG_INFO(fmt::format("Uploading object {} to {} from {}", object_name, bucket_name, file_path)); minio::s3::UploadObjectResponse resp = client_->UploadObject(args); // Handle response. if (resp) { - LOG_TRACE(fmt::format("{} uploaded to {} successfully", file_path, object_name)); + LOG_INFO(fmt::format("{} uploaded to {} successfully", file_path, object_name)); } else { UnrecoverableError("unable to upload object; " + resp.Error().String()); } @@ -66,11 +68,12 @@ Status S3ClientMinio::RemoveObject(const String &bucket_name, const String &obje args.object = object_name; // Call remove object. + LOG_INFO(fmt::format("Removing object {} from {}", object_name, bucket_name)); minio::s3::RemoveObjectResponse resp = client_->RemoveObject(args); // Handle response. if (resp) { - LOG_TRACE(fmt::format("{} is removed from {} successfully", object_name, bucket_name)); + LOG_INFO(fmt::format("{} is removed from {} successfully", object_name, bucket_name)); } else { UnrecoverableError("unable to remove object; " + resp.Error().String()); } diff --git a/src/storage/meta/catalog.cpp b/src/storage/meta/catalog.cpp index 42bd393e90..250164f2f1 100644 --- a/src/storage/meta/catalog.cpp +++ b/src/storage/meta/catalog.cpp @@ -584,7 +584,7 @@ void Catalog::LoadFromEntryDelta(UniquePtr delta_entry, Buffe auto *pm = InfinityContext::instance().persistence_manager(); for (auto &op : delta_ops) { auto type = op->GetType(); - LOG_INFO(fmt::format("Load delta op {}", op->ToString())); + // LOG_INFO(fmt::format("Load delta op {}", op->ToString())); auto commit_ts = op->commit_ts_; auto txn_id = op->txn_id_; auto begin_ts = op->begin_ts_; @@ -799,7 +799,7 @@ void Catalog::LoadFromEntryDelta(UniquePtr delta_entry, Buffe auto *block_entry = segment_entry->GetBlockEntryByID(block_id).get(); if (merge_flag == MergeFlag::kDelete) { block_entry->DropColumnReplay(column_id); - } else if (merge_flag == MergeFlag::kNew) { + } else if (merge_flag == MergeFlag::kNew || merge_flag == MergeFlag::kUpdate) { block_entry->AddColumnReplay(BlockColumnEntry::NewReplayBlockColumnEntry(block_entry, column_id, buffer_mgr, @@ -807,8 +807,6 @@ void Catalog::LoadFromEntryDelta(UniquePtr delta_entry, Buffe last_chunk_offset, commit_ts), column_id); - } else if (merge_flag == MergeFlag::kUpdate) { - // do nothing } else { UnrecoverableError(fmt::format("Unsupported merge flag {} for column entry {}", (i8)merge_flag, column_id)); } @@ -1133,6 +1131,17 @@ void Catalog::AddDeltaEntry(UniquePtr delta_entry) { global_c void Catalog::PickCleanup(CleanupScanner *scanner) { db_meta_map_.PickCleanup(scanner); } +void Catalog::InitCompactionAlg(TxnTimeStamp system_start_ts) { + TransactionID txn_id = 0; // fake txn id + Vector db_entries = this->Databases(txn_id, system_start_ts); + for (auto *db_entry : db_entries) { + Vector table_entries = db_entry->TableCollections(txn_id, system_start_ts); + for (auto *table_entry : table_entries) { + table_entry->InitCompactionAlg(system_start_ts); + } + } +} + void Catalog::MemIndexCommit() { auto db_meta_map_guard = db_meta_map_.GetMetaMap(); for (auto &[_, db_meta] : *db_meta_map_guard) { diff --git a/src/storage/meta/catalog.cppm b/src/storage/meta/catalog.cppm index 288a9108b3..622b93128e 100644 --- a/src/storage/meta/catalog.cppm +++ b/src/storage/meta/catalog.cppm @@ -301,6 +301,8 @@ public: void PickCleanup(CleanupScanner *scanner); + void InitCompactionAlg(TxnTimeStamp system_start_ts); + private: UniquePtr global_catalog_delta_entry_{MakeUnique()}; }; diff --git a/src/storage/meta/entry/block_entry.cpp b/src/storage/meta/entry/block_entry.cpp index 03bfeb9aef..d7893483e4 100644 --- a/src/storage/meta/entry/block_entry.cpp +++ b/src/storage/meta/entry/block_entry.cpp @@ -165,6 +165,10 @@ void BlockEntry::UpdateBlockReplay(SharedPtr block_entry, String blo } } +BlockColumnEntry *BlockEntry::GetColumnBlockEntry(SizeT column_idx) const { + return columns_[column_idx].get(); +} + SizeT BlockEntry::row_count(TxnTimeStamp check_ts) const { std::shared_lock lock(rw_locker_); if (check_ts >= max_row_ts_) @@ -600,11 +604,11 @@ SharedPtr BlockEntry::DetermineDir(const String &parent_dir, BlockID blo } void BlockEntry::AddColumnReplay(UniquePtr column_entry, ColumnID column_id) { - auto iter = std::find_if(columns_.begin(), columns_.end(), [&](const auto &column) { return column->column_id() == column_id; }); - if (iter == columns_.end()) { - columns_.emplace_back(std::move(column_entry)); - } else { + auto iter = std::lower_bound(columns_.begin(), columns_.end(), column_id, [&](const auto &column, ColumnID column_id) { return column->column_id() < column_id; }); + if (iter != columns_.end() && (*iter)->column_id() == column_id) { *iter = std::move(column_entry); + } else { + columns_.insert(iter, std::move(column_entry)); } } diff --git a/src/storage/meta/entry/block_entry.cppm b/src/storage/meta/entry/block_entry.cppm index a663913a2e..fb0a1634f8 100644 --- a/src/storage/meta/entry/block_entry.cppm +++ b/src/storage/meta/entry/block_entry.cppm @@ -150,7 +150,7 @@ public: // Relative to the `data_dir` config item const SharedPtr &block_dir() const { return block_dir_; } - BlockColumnEntry *GetColumnBlockEntry(SizeT column_idx) const { return columns_[column_idx].get(); } + BlockColumnEntry *GetColumnBlockEntry(SizeT column_idx) const; FastRoughFilter *GetFastRoughFilter() { return fast_rough_filter_.get(); } diff --git a/src/storage/meta/entry/chunk_index_entry.cpp b/src/storage/meta/entry/chunk_index_entry.cpp index 81adda15b7..fc3e85b85a 100644 --- a/src/storage/meta/entry/chunk_index_entry.cpp +++ b/src/storage/meta/entry/chunk_index_entry.cpp @@ -62,9 +62,6 @@ Vector ChunkIndexEntry::DecodeIndex(std::string_view encode) { } String ChunkIndexEntry::EncodeIndex(const ChunkID chunk_id, const String &base_name, const SegmentIndexEntry *segment_index_entry) { - if (!base_name.empty()) { - return fmt::format("{}#{}", segment_index_entry->encode(), base_name); - } return fmt::format("{}#{}", segment_index_entry->encode(), chunk_id); } diff --git a/src/storage/meta/entry/segment_index_entry.cpp b/src/storage/meta/entry/segment_index_entry.cpp index a86309b703..1d78a6d110 100644 --- a/src/storage/meta/entry/segment_index_entry.cpp +++ b/src/storage/meta/entry/segment_index_entry.cpp @@ -101,7 +101,7 @@ SegmentIndexEntry::SegmentIndexEntry(const SegmentIndexEntry &other) min_ts_ = other.min_ts_; max_ts_ = other.max_ts_; checkpoint_ts_ = other.checkpoint_ts_; - next_chunk_id_ = other.next_chunk_id_; + next_chunk_id_ = other.next_chunk_id_.load(); memory_hnsw_index_ = other.memory_hnsw_index_; memory_ivf_index_ = other.memory_ivf_index_; @@ -178,7 +178,7 @@ void SegmentIndexEntry::UpdateSegmentIndexReplay(SharedPtr ne assert(new_entry->index_dir_ == index_dir_); min_ts_ = new_entry->min_ts_; max_ts_ = new_entry->max_ts_; - next_chunk_id_ = new_entry->next_chunk_id_; + next_chunk_id_ = new_entry->next_chunk_id_.load(); } // String SegmentIndexEntry::IndexFileName(SegmentID segment_id) { return fmt::format("seg{}.idx", segment_id); } @@ -393,17 +393,17 @@ void SegmentIndexEntry::AddWalIndexDump(ChunkIndexEntry *dumped_index_entry, Txn txn->AddWalCmd(MakeShared(db_name, table_name, index_name, segment_id_, std::move(chunk_infos), std::move(deprecate_chunk_ids))); } -void SegmentIndexEntry::MemIndexLoad(const String &base_name, RowID base_row_id) { - const IndexBase *index_base = table_index_entry_->index_base(); - if (index_base->index_type_ != IndexType::kFullText) - return; - // Init the mem index from previously spilled one. - assert(memory_indexer_.get() == nullptr); - const IndexFullText *index_fulltext = static_cast(index_base); - String full_path = Path(InfinityContext::instance().config()->DataDir()) / *table_index_entry_->index_dir(); - memory_indexer_ = MakeUnique(full_path, base_name, base_row_id, index_fulltext->flag_, index_fulltext->analyzer_); - memory_indexer_->Load(); -} +// void SegmentIndexEntry::MemIndexLoad(const String &base_name, RowID base_row_id) { +// const IndexBase *index_base = table_index_entry_->index_base(); +// if (index_base->index_type_ != IndexType::kFullText) +// return; +// // Init the mem index from previously spilled one. +// assert(memory_indexer_.get() == nullptr); +// const IndexFullText *index_fulltext = static_cast(index_base); +// String full_path = Path(InfinityContext::instance().config()->DataDir()) / *table_index_entry_->index_dir(); +// memory_indexer_ = MakeUnique(full_path, base_name, base_row_id, index_fulltext->flag_, index_fulltext->analyzer_); +// memory_indexer_->Load(); +// } u32 SegmentIndexEntry::MemIndexRowCount() { const IndexBase *index_base = table_index_entry_->index_base(); @@ -616,6 +616,40 @@ Status SegmentIndexEntry::CreateIndexDo(atomic_u64 &create_index_idx) { return Status::OK(); } +void SegmentIndexEntry::GetChunkIndexEntries(Vector> &chunk_index_entries, + SharedPtr &memory_indexer, + Txn *txn) { + std::shared_lock lock(rw_locker_); + chunk_index_entries.clear(); + SizeT num = chunk_index_entries_.size(); + for (SizeT i = 0; i < num; i++) { + auto &chunk_index_entry = chunk_index_entries_[i]; + bool add = chunk_index_entry->CheckVisible(txn); + LOG_INFO(fmt::format("GetChunkIndexEntries, CheckVisible ret: {}, chunk_id: {}, deprecate ts: {}", + add, + chunk_index_entry->chunk_id_, + chunk_index_entry->deprecate_ts_.load())); + if (add) { + chunk_index_entries.push_back(chunk_index_entry); + } + } + std::sort(std::begin(chunk_index_entries), + std::end(chunk_index_entries), + [](const SharedPtr &lhs, const SharedPtr &rhs) noexcept { + return (lhs->base_rowid_ < rhs->base_rowid_ || (lhs->base_rowid_ == rhs->base_rowid_ && lhs->row_count_ < rhs->row_count_)); + }); + memory_indexer = memory_indexer_; +} + +void SegmentIndexEntry::RemoveChunkIndexEntry(ChunkIndexEntry *chunk_index_entry) { + RowID base_rowid = chunk_index_entry->base_rowid_; + std::unique_lock lock(rw_locker_); + chunk_index_entries_.erase(std::remove_if(chunk_index_entries_.begin(), + chunk_index_entries_.end(), + [base_rowid](const SharedPtr &entry) { return entry->base_rowid_ == base_rowid; }), + chunk_index_entries_.end()); +} + void SegmentIndexEntry::CommitSegmentIndex(TransactionID txn_id, TxnTimeStamp commit_ts) { std::unique_lock lock(rw_locker_); @@ -890,7 +924,7 @@ ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_ using IndexT = std::decay_t; using SparseRefT = SparseVecRef; - OneColumnIterator iter(segment_entry, buffer_mgr, column_def->id(), begin_ts); + CappedOneColumnIterator iter(segment_entry, buffer_mgr, column_def->id(), begin_ts, row_count); index->AddDocs(std::move(iter)); } }, @@ -999,12 +1033,41 @@ SharedPtr SegmentIndexEntry::AddChunkIndexEntryReplayWal(ChunkI TxnTimeStamp commit_ts, TxnTimeStamp deprecate_ts, BufferManager *buffer_mgr) { - if (chunk_id != next_chunk_id_) { - UnrecoverableError(fmt::format("Chunk id: {} is not equal to next chunk id: {}", chunk_id, next_chunk_id_)); + LOG_INFO(fmt::format("AddChunkIndexEntryReplayWal chunk_id: {} deprecate_ts: {}, base_rowid: {}, row_count: {} to to segment: {}", + chunk_id, + + deprecate_ts, + base_rowid.ToUint64(), + row_count, + segment_id_)); + SharedPtr chunk_index_entry = + ChunkIndexEntry::NewReplayChunkIndexEntry(chunk_id, this, base_name, base_rowid, row_count, commit_ts, deprecate_ts, buffer_mgr); + assert(std::is_sorted(chunk_index_entries_.begin(), chunk_index_entries_.end(), [](const SharedPtr &lhs, const SharedPtr &rhs) { + return lhs->chunk_id_ < rhs->chunk_id_; + })); + auto iter = std::lower_bound(chunk_index_entries_.begin(), chunk_index_entries_.end(), chunk_id, [&](const SharedPtr &entry, ChunkID id) { + return entry->chunk_id_ < id; + }); + if (iter != chunk_index_entries_.end() && (*iter)->chunk_id_ == chunk_id) { + UnrecoverableError(fmt::format("Chunk ID: {} already exists in segment: {}", chunk_id, segment_id_)); } - auto ret = this->AddChunkIndexEntryReplay(chunk_id, table_entry, base_name, base_rowid, row_count, commit_ts, deprecate_ts, buffer_mgr); - ++next_chunk_id_; - return ret; + chunk_index_entries_.insert(iter, chunk_index_entry); + ChunkID old_next_chunk_id = next_chunk_id_; + next_chunk_id_ = std::max(next_chunk_id_.load(), chunk_index_entries_.back()->chunk_id_ + 1); + LOG_INFO(fmt::format("AddChunkIndexEntryReplayWal, old_next_chunk_id: {}, next_chunk_id_, chunk_id: {}", old_next_chunk_id, next_chunk_id_, chunk_id)); + if (table_index_entry_->table_index_def()->index_type_ == IndexType::kFullText) { + try { + u64 column_length_sum = chunk_index_entry->GetColumnLengthSum(); + UpdateFulltextColumnLenInfo(column_length_sum, row_count); + } catch (const UnrecoverableException &e) { + String msg(e.what()); + if (!msg.find("No such file or directory")) { + throw e; + } + LOG_WARN("Fulltext index file not found, skip update column length info"); + } + }; + return chunk_index_entry; } SharedPtr SegmentIndexEntry::AddChunkIndexEntryReplay(ChunkID chunk_id, @@ -1015,18 +1078,28 @@ SharedPtr SegmentIndexEntry::AddChunkIndexEntryReplay(ChunkID c TxnTimeStamp commit_ts, TxnTimeStamp deprecate_ts, BufferManager *buffer_mgr) { + if (chunk_id >= next_chunk_id_) { + UnrecoverableError(fmt::format("Chunk ID: {} is greater than next chunk ID: {}", chunk_id, next_chunk_id_)); + } + LOG_INFO(fmt::format("AddChunkIndexEntryReplay chunk_id: {} deprecate_ts: {}, base_rowid: {}, row_count: {} to to segment: {}", + chunk_id, + + deprecate_ts, + base_rowid.ToUint64(), + row_count, + segment_id_)); SharedPtr chunk_index_entry = ChunkIndexEntry::NewReplayChunkIndexEntry(chunk_id, this, base_name, base_rowid, row_count, commit_ts, deprecate_ts, buffer_mgr); - bool add = false; - for (auto &chunk : chunk_index_entries_) { - if (chunk->chunk_id_ == chunk_id) { - chunk = chunk_index_entry; - add = true; - break; - } - } - if (!add) { - chunk_index_entries_.push_back(chunk_index_entry); + assert(std::is_sorted(chunk_index_entries_.begin(), chunk_index_entries_.end(), [](const SharedPtr &lhs, const SharedPtr &rhs) { + return lhs->chunk_id_ < rhs->chunk_id_; + })); + auto iter = std::lower_bound(chunk_index_entries_.begin(), chunk_index_entries_.end(), chunk_id, [&](const SharedPtr &entry, ChunkID id) { + return entry->chunk_id_ < id; + }); + if (iter != chunk_index_entries_.end() && (*iter)->chunk_id_ == chunk_id) { + *iter = chunk_index_entry; + } else { + chunk_index_entries_.insert(iter, chunk_index_entry); } if (table_index_entry_->table_index_def()->index_type_ == IndexType::kFullText) { try { @@ -1066,7 +1139,7 @@ nlohmann::json SegmentIndexEntry::Serialize(TxnTimeStamp max_commit_ts) { index_entry_json["commit_ts"] = this->commit_ts_.load(); index_entry_json["min_ts"] = this->min_ts_; index_entry_json["max_ts"] = this->max_ts_; - index_entry_json["next_chunk_id"] = this->next_chunk_id_; + index_entry_json["next_chunk_id"] = this->next_chunk_id_.load(); index_entry_json["checkpoint_ts"] = this->checkpoint_ts_; // TODO shenyushi:: use fields in BaseEntry for (auto &chunk_index_entry : chunk_index_entries_) { diff --git a/src/storage/meta/entry/segment_index_entry.cppm b/src/storage/meta/entry/segment_index_entry.cppm index 8372448ac7..3d7ac9425c 100644 --- a/src/storage/meta/entry/segment_index_entry.cppm +++ b/src/storage/meta/entry/segment_index_entry.cppm @@ -126,8 +126,8 @@ public: void AddWalIndexDump(ChunkIndexEntry *dumped_index_entry, Txn *txn, Vector chunk_ids = {}); - // Init the mem index from previously spilled one. - void MemIndexLoad(const String &base_name, RowID base_row_id); + // // Init the mem index from previously spilled one. + // void MemIndexLoad(const String &base_name, RowID base_row_id); // Populate index entirely for the segment void PopulateEntirely(const SegmentEntry *segment_entry, Txn *txn, const PopulateEntireConfig &config); @@ -138,32 +138,9 @@ public: Status CreateIndexDo(atomic_u64 &create_index_idx); - void GetChunkIndexEntries(Vector> &chunk_index_entries, SharedPtr &memory_indexer, Txn *txn = nullptr) { - std::shared_lock lock(rw_locker_); - chunk_index_entries.clear(); - SizeT num = chunk_index_entries_.size(); - for (SizeT i = 0; i < num; i++) { - auto &chunk_index_entry = chunk_index_entries_[i]; - if (chunk_index_entry->CheckVisible(txn)) { - chunk_index_entries.push_back(chunk_index_entry); - } - } - std::sort(std::begin(chunk_index_entries), - std::end(chunk_index_entries), - [](const SharedPtr &lhs, const SharedPtr &rhs) noexcept { - return (lhs->base_rowid_ < rhs->base_rowid_ || (lhs->base_rowid_ == rhs->base_rowid_ && lhs->row_count_ < rhs->row_count_)); - }); - memory_indexer = memory_indexer_; - } + void GetChunkIndexEntries(Vector> &chunk_index_entries, SharedPtr &memory_indexer, Txn *txn = nullptr); - void RemoveChunkIndexEntry(ChunkIndexEntry *chunk_index_entry) { - RowID base_rowid = chunk_index_entry->base_rowid_; - std::unique_lock lock(rw_locker_); - chunk_index_entries_.erase(std::remove_if(chunk_index_entries_.begin(), - chunk_index_entries_.end(), - [base_rowid](const SharedPtr &entry) { return entry->base_rowid_ == base_rowid; }), - chunk_index_entries_.end()); - } + void RemoveChunkIndexEntry(ChunkIndexEntry *chunk_index_entry); void ReplaceChunkIndexEntries(TxnTableStore *txn_table_store, SharedPtr merged_chunk_index_entry, @@ -274,7 +251,7 @@ private: TxnTimeStamp min_ts_{0}; // Indicate the commit_ts which create this SegmentIndexEntry TxnTimeStamp max_ts_{0}; // Indicate the max commit_ts which update data inside this SegmentIndexEntry TxnTimeStamp checkpoint_ts_{0}; - ChunkID next_chunk_id_{0}; + Atomic next_chunk_id_{0}; Vector> chunk_index_entries_{}; diff --git a/src/storage/meta/entry/table_entry.cpp b/src/storage/meta/entry/table_entry.cpp index ec3eb8b013..2322e0c160 100644 --- a/src/storage/meta/entry/table_entry.cpp +++ b/src/storage/meta/entry/table_entry.cpp @@ -333,9 +333,6 @@ void TableEntry::AddSegmentReplayWalCompact(SharedPtr segment_entr void TableEntry::AddSegmentReplayWal(SharedPtr new_segment) { SegmentID segment_id = new_segment->segment_id(); segment_map_[segment_id] = new_segment; - if (compaction_alg_.get() != nullptr) { - compaction_alg_->AddSegment(new_segment.get()); - } next_segment_id_++; } @@ -347,9 +344,6 @@ void TableEntry::AddSegmentReplay(SharedPtr new_segment) { String error_message = fmt::format("Segment {} already exists.", segment_id); UnrecoverableError(error_message); } - if (compaction_alg_.get() != nullptr) { - compaction_alg_->AddSegment(new_segment.get()); - } if (segment_id == unsealed_id_) { unsealed_segment_ = std::move(new_segment); } @@ -540,17 +534,17 @@ Status TableEntry::CommitCompact(TransactionID txn_id, TxnTimeStamp commit_ts, T } { - // { - // String ss = "Compact commit: " + *this->GetTableName(); - // for (const auto &[segment_store, old_segments] : compact_store.compact_data_) { - // auto *new_segment = segment_store.segment_entry_; - // ss += ", new segment: " + std::to_string(new_segment->segment_id()) + ", old segment: "; - // for (const auto *old_segment : old_segments) { - // ss += std::to_string(old_segment->segment_id_) + " "; - // } - // } - // LOG_INFO(ss); - // } + { + String ss = "Compact commit: " + *this->GetTableName(); + for (const auto &[segment_store, old_segments] : compact_store.compact_data_) { + auto *new_segment = segment_store.segment_entry_; + ss += ", new segment: " + std::to_string(new_segment->segment_id()) + ", old segment: "; + for (const auto *old_segment : old_segments) { + ss += std::to_string(old_segment->segment_id_) + " "; + } + } + LOG_INFO(ss); + } std::unique_lock lock(this->rw_locker_); for (const auto &[segment_store, old_segments] : compact_store.compact_data_) { @@ -974,9 +968,11 @@ void TableEntry::OptimizeIndex(Txn *txn) { ColumnIndexMerger column_index_merger(*table_index_entry->index_dir_, index_fulltext->flag_); column_index_merger.Merge(base_names, base_rowids, dst_base_name); + Vector old_ids; for (SizeT i = 0; i < chunk_index_entries.size(); i++) { auto &chunk_index_entry = chunk_index_entries[i]; old_chunks.push_back(chunk_index_entry.get()); + old_ids.push_back(chunk_index_entry->chunk_id_); } ChunkID chunk_id = segment_index_entry->GetNextChunkID(); SharedPtr merged_chunk_index_entry = ChunkIndexEntry::NewFtChunkIndexEntry(segment_index_entry.get(), @@ -992,6 +988,8 @@ void TableEntry::OptimizeIndex(Txn *txn) { TxnTimeStamp ts = std::max(txn->BeginTS(), txn->CommitTS()); table_index_entry->UpdateFulltextSegmentTs(ts); LOG_INFO(fmt::format("done merging {} {}", index_name, dst_base_name)); + + segment_index_entry->AddWalIndexDump(merged_chunk_index_entry.get(), txn, std::move(old_ids)); } break; } @@ -1310,9 +1308,16 @@ bool TableEntry::CheckDeleteConflict(const Vector &delete_row_ids, Transa } void TableEntry::AddSegmentToCompactionAlg(SegmentEntry *segment_entry) { + LOG_INFO(fmt::format("Add segment {} to table {} compaction algorithm. deprecate_ts: {}", + segment_entry->segment_id(), + *table_name_, + segment_entry->deprecate_ts())); if (compaction_alg_.get() == nullptr) { return; } + if (segment_entry->CheckDeprecate(UNCOMMIT_TS)) { + UnrecoverableError(fmt::format("Add deprecated segment {} to compaction algorithm", segment_entry->segment_id())); + } compaction_alg_->AddSegment(segment_entry); } @@ -1323,6 +1328,19 @@ void TableEntry::AddDeleteToCompactionAlg(SegmentID segment_id) { compaction_alg_->DeleteInSegment(segment_id); } +void TableEntry::InitCompactionAlg(TxnTimeStamp system_start_ts) { + for (auto &[segment_id, segment_entry] : segment_map_) { + if (segment_entry->CheckDeprecate(system_start_ts)) { + continue; + } + LOG_INFO(fmt::format("Add segment {} to table {} compaction algorithm. deprecate_ts: {}", + segment_entry->segment_id(), + *table_name_, + segment_entry->deprecate_ts())); + compaction_alg_->AddSegment(segment_entry.get()); + } +} + Vector TableEntry::CheckCompaction(TransactionID txn_id) { if (compaction_alg_.get() == nullptr) { return {}; @@ -1350,7 +1368,14 @@ void TableEntry::PickCleanup(CleanupScanner *scanner) { // If segment is visible by txn, txn.begin_ts < segment.deprecate_ts // If segment can be cleaned up, segment.deprecate_ts > visible_ts, and visible_ts must > txn.begin_ts // So the used segment will not be cleaned up. - if (segment->CheckDeprecate(visible_ts)) { + bool deprecate = segment->CheckDeprecate(visible_ts); + LOG_INFO(fmt::format("Check deprecate of segment {} in table {}. check_ts: {}, drepcate_ts: {}. result: {}", + segment->segment_id(), + *table_name_, + visible_ts, + segment->deprecate_ts(), + deprecate)); + if (deprecate) { cleanup_segment_ids.push_back(iter->first); scanner->AddEntry(std::move(iter->second)); iter = segment_map_.erase(iter); diff --git a/src/storage/meta/entry/table_entry.cppm b/src/storage/meta/entry/table_entry.cppm index f576db27da..90ba448ba4 100644 --- a/src/storage/meta/entry/table_entry.cppm +++ b/src/storage/meta/entry/table_entry.cppm @@ -316,6 +316,8 @@ public: void AddDeleteToCompactionAlg(SegmentID segment_id); + void InitCompactionAlg(TxnTimeStamp system_start_ts); + Vector CheckCompaction(TransactionID txn_id); bool CompactPrepare() const; diff --git a/src/storage/meta/iter/segment_iter.cppm b/src/storage/meta/iter/segment_iter.cppm index 5952009801..a2d951fff4 100644 --- a/src/storage/meta/iter/segment_iter.cppm +++ b/src/storage/meta/iter/segment_iter.cppm @@ -165,6 +165,8 @@ public: return std::make_pair(v_ptr, multi_vector_segment_offset_); } + SizeT offset() const { return segment_iter_.offset(); } + private: SegmentIter segment_iter_; SizeT ele_size_; @@ -198,6 +200,8 @@ public: return std::make_pair(sparse_vec_ref, offset); } + SizeT offset() const { return segment_iter_.offset(); } + private: SegmentIter segment_iter_; }; diff --git a/src/storage/storage.cpp b/src/storage/storage.cpp index a56f54b839..9db4de705c 100644 --- a/src/storage/storage.cpp +++ b/src/storage/storage.cpp @@ -98,6 +98,12 @@ void Storage::SetStorageMode(StorageMode target_mode) { LOG_INFO(fmt::format("Set storage from admin mode to un-init")); break; } + + i64 compact_interval = config_ptr_->CompactInterval() > 0 ? config_ptr_->CompactInterval() : 0; + i64 optimize_interval = config_ptr_->OptimizeIndexInterval() > 0 ? config_ptr_->OptimizeIndexInterval() : 0; + i64 cleanup_interval = config_ptr_->CleanupInterval() > 0 ? config_ptr_->CleanupInterval() : 0; + i64 full_checkpoint_interval_sec = config_ptr_->FullCheckpointInterval() > 0 ? config_ptr_->FullCheckpointInterval() : 0; + i64 delta_checkpoint_interval_sec = config_ptr_->DeltaCheckpointInterval() > 0 ? config_ptr_->DeltaCheckpointInterval() : 0; switch (config_ptr_->StorageType()) { case StorageType::kLocal: { // Not init remote store @@ -158,6 +164,12 @@ void Storage::SetStorageMode(StorageMode target_mode) { LOG_INFO(fmt::format("Init a new catalog")); new_catalog_ = Catalog::NewCatalog(); } + if (compact_interval > 0) { + LOG_INFO(fmt::format("Init compaction alg")); + new_catalog_->InitCompactionAlg(system_start_ts); + } else { + LOG_INFO(fmt::format("Skip init compaction alg")); + } BuiltinFunctions builtin_functions(new_catalog_); builtin_functions.Init(); @@ -210,12 +222,6 @@ void Storage::SetStorageMode(StorageMode target_mode) { } periodic_trigger_thread_ = MakeUnique(); - i64 compact_interval = config_ptr_->CompactInterval() > 0 ? config_ptr_->CompactInterval() : 0; - i64 optimize_interval = config_ptr_->OptimizeIndexInterval() > 0 ? config_ptr_->OptimizeIndexInterval() : 0; - i64 cleanup_interval = config_ptr_->CleanupInterval() > 0 ? config_ptr_->CleanupInterval() : 0; - i64 full_checkpoint_interval_sec = config_ptr_->FullCheckpointInterval() > 0 ? config_ptr_->FullCheckpointInterval() : 0; - i64 delta_checkpoint_interval_sec = config_ptr_->DeltaCheckpointInterval() > 0 ? config_ptr_->DeltaCheckpointInterval() : 0; - if (target_mode == StorageMode::kWritable) { periodic_trigger_thread_->full_checkpoint_trigger_ = MakeShared(full_checkpoint_interval_sec, wal_mgr_.get(), true); diff --git a/src/storage/wal/catalog_delta_entry.cpp b/src/storage/wal/catalog_delta_entry.cpp index 80b1bb18f2..7287c4a87b 100644 --- a/src/storage/wal/catalog_delta_entry.cpp +++ b/src/storage/wal/catalog_delta_entry.cpp @@ -790,7 +790,8 @@ const String AddSegmentIndexEntryOp::ToString() const { } const String AddChunkIndexEntryOp::ToString() const { - return fmt::format("AddChunkIndexEntryOp base_name: {} base_rowid: {} row_count: {} commit_ts: {} deprecate_ts: {}", + return fmt::format("AddChunkIndexEntryOp {} base_name: {} base_rowid: {} row_count: {} commit_ts: {} deprecate_ts: {}", + CatalogDeltaOperation::ToString(), base_name_, base_rowid_.ToUint64(), row_count_, diff --git a/src/storage/wal/wal_manager.cpp b/src/storage/wal/wal_manager.cpp index f23c503354..36c4dee7ca 100644 --- a/src/storage/wal/wal_manager.cpp +++ b/src/storage/wal/wal_manager.cpp @@ -638,7 +638,7 @@ i64 WalManager::ReplayWalFile(StorageMode targe_storage_mode) { // replay_entries.clear(); break; } - LOG_TRACE(wal_entry->ToString()); + // LOG_TRACE(wal_entry->ToString()); if (wal_entry->commit_ts_ > max_commit_ts) { replay_entries.push_back(wal_entry); @@ -1163,6 +1163,7 @@ void WalManager::WalCmdOptimizeReplay(WalCmdOptimize &cmd, TransactionID txn_id, } void WalManager::WalCmdDumpIndexReplay(WalCmdDumpIndex &cmd, TransactionID txn_id, TxnTimeStamp commit_ts) { + LOG_INFO(fmt::format("Replaying dump index: {}", cmd.ToString())); auto [table_index_entry, status] = storage_->catalog()->GetIndexByName(cmd.db_name_, cmd.table_name_, cmd.index_name_, txn_id, commit_ts); auto *table_entry = table_index_entry->table_index_meta()->GetTableEntry(); auto *buffer_mgr = storage_->buffer_manager(); @@ -1207,6 +1208,8 @@ void WalManager::WalCmdDumpIndexReplay(WalCmdDumpIndex &cmd, TransactionID txn_i auto *old_chunk = segment_index_entry->GetChunkIndexEntry(old_chunk_id); if (old_chunk != nullptr) { old_chunk->DeprecateChunk(commit_ts); + } else { + LOG_WARN(fmt::format("WalCmdDumpIndexReplay: cannot find chunk id: {} in segment: {}", old_chunk_id, cmd.segment_id_)); } } } diff --git a/test/data/config/restart_test/test_fulltext/2.toml b/test/data/config/restart_test/test_fulltext/2.toml index 5904be7d0d..8d8311af99 100644 --- a/test/data/config/restart_test/test_fulltext/2.toml +++ b/test/data/config/restart_test/test_fulltext/2.toml @@ -7,10 +7,10 @@ time_zone = "utc-8" log_to_stdout = true [storage] +optimize_interval = "0s" +mem_index_capacity = 10000 [buffer] [wal] -optimize_interval = "0s" -mem_index_capacity = 10000 [resource] diff --git a/test/data/config/restart_test/test_fulltext/3.toml b/test/data/config/restart_test/test_fulltext/3.toml index 4f9b747165..ae7c08e20d 100644 --- a/test/data/config/restart_test/test_fulltext/3.toml +++ b/test/data/config/restart_test/test_fulltext/3.toml @@ -5,12 +5,13 @@ time_zone = "utc-8" [network] [log] log_to_stdout = true +log_level = "debug" [storage] +optimize_interval = "1s" +mem_index_capacity = 10000 [buffer] [wal] -optimize_interval = "1s" -mem_index_capacity = 10000 [resource] diff --git a/test/data/config/restart_test/test_insert/5.toml b/test/data/config/restart_test/test_insert/5.toml index ed77f04091..416264fae8 100644 --- a/test/data/config/restart_test/test_insert/5.toml +++ b/test/data/config/restart_test/test_insert/5.toml @@ -5,6 +5,7 @@ time_zone = "utc-8" [network] [log] log_to_stdout = true +log_level = "trace" [storage] optimize_interval = "3s" diff --git a/tools/run_pytest_parallel_continuous.py b/tools/run_pytest_parallel_continuous.py index 7488ba5484..bae449f2ef 100644 --- a/tools/run_pytest_parallel_continuous.py +++ b/tools/run_pytest_parallel_continuous.py @@ -1,12 +1,19 @@ from concurrent import futures import os -from run_pytest_parallel import commands, run_command +from run_pytest_parallel import run_command import time import argparse +commands = [ + "python3 tools/run_pysdk_remote_infinity_test.py --pytest_mark='not complex'", + "python3 tools/run_parallel_test.py --pytest_mark='not complex'", + "python3 tools/run_http_api.py --pytest_mark='not complex'", + "python3 tools/sqllogictest.py" +] LOG_PATH = "/var/infinity/log/infinity.log" -TEST_SEC = 3600 +# TEST_SEC = 3600 # 1 hour +TEST_SEC = 10 # run once def clear_infinity_log(): diff --git a/tools/run_restart_test.py b/tools/run_restart_test.py index 97c68ed80e..0f8bcf82bb 100644 --- a/tools/run_restart_test.py +++ b/tools/run_restart_test.py @@ -13,27 +13,46 @@ type=str, default="./build/Debug/src/infinity", ) + parser.add_argument( + "--slow", + type=bool, + default=False, + ) args = parser.parse_args() infinity_path = args.infinity_path + slow = args.slow current_path = os.getcwd() python_test_dir = current_path + "/python" - process = subprocess.Popen( - [ - python_executable, - "-m", - "pytest", - # "--capture=tee-sys", - f"{python_test_dir}/restart_test", - f"--infinity_path={infinity_path}", - "-x", - "-s", - "-m", - "not slow" - ] - ) + if not slow: + process = subprocess.Popen( + [ + python_executable, + "-m", + "pytest", + f"{python_test_dir}/restart_test", + f"--infinity_path={infinity_path}", + "-x", + "-s", + "-m", + "not slow", + ] + ) + else: + process = subprocess.Popen( + [ + python_executable, + "-m", + "pytest", + f"{python_test_dir}/restart_test", + f"--infinity_path={infinity_path}", + "-x", + "-s", + ] + ) process.wait() if process.returncode != 0: - raise Exception(f"An error occurred: {process.stderr}") \ No newline at end of file + print(f"An error occurred: {process.stderr}") + sys.exit(-1) diff --git a/tools/run_restart_test_continuously.py b/tools/run_restart_test_continuously.py new file mode 100644 index 0000000000..ca181a6d72 --- /dev/null +++ b/tools/run_restart_test_continuously.py @@ -0,0 +1,40 @@ + +import argparse +import os +import sys +import time + + +# TEST_SEC = 3600 # 1 hour +TEST_SEC = 10 # run once + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Restart Test Continuously") + parser.add_argument( + "-t", + "--test_sec", + type=int, + default=TEST_SEC, + dest="test_sec", + ) + parser.add_argument( + "--infinity_path", + type=str, + default="./build/Debug/src/infinity", + dest="infinity_path", + ) + args = parser.parse_args() + test_sec = args.test_sec + infinity_path = args.infinity_path + + begin_time = time.time() + test_i = 0 + + test_fail = False + while (time.time() - begin_time) < test_sec and not test_fail: + print(f"Test {test_i}") + try: + os.system(f"python3 tools/run_restart_test.py --infinity_path={infinity_path} --slow=true") + except Exception as e: + print(e) + sys.exit(1) \ No newline at end of file