From 8e06d279660876fc6eebb73a97611d9bc806fa27 Mon Sep 17 00:00:00 2001 From: Zhiyuan Liang <132966438+Ami11111@users.noreply.github.com> Date: Tue, 5 Nov 2024 11:24:41 +0800 Subject: [PATCH] Add cluster test cases (#2166) ### What problem does this PR solve? Add cluster test cases ### Type of change - [x] Test cases --------- Co-authored-by: shenyushi --- conf/learner2.toml | 69 ++++++ python/infinity_http.py | 11 +- python/test_cluster/common_values.py | 2 + python/test_cluster/conftest.py | 21 +- .../test_cluster/docker_infinity_cluster.py | 59 +---- python/test_cluster/infinity_cluster.py | 99 +++++---- python/test_cluster/requirements.txt | 1 + python/test_cluster/test_basic.py | 203 ++++++++++++++---- python/test_cluster/test_database.py | 100 +++++++++ python/test_cluster/test_delete.py | 132 ++++++++++++ python/test_cluster/test_export.py | 70 ++++++ python/test_cluster/test_import.py | 67 ++++++ python/test_cluster/test_index.py | 87 ++++++++ python/test_cluster/test_insert.py | 144 ++++++++++++- python/test_cluster/test_knn.py | 67 ++++++ python/test_cluster/test_select.py | 86 ++++++++ python/test_cluster/test_table.py | 103 +++++++++ python/test_cluster/test_update.py | 110 ++++++++++ scripts/build_tester_image.py | 6 +- src/common/utility/exception.cpp | 12 +- src/main/cluster_manager.cpp | 5 +- src/storage/wal/wal_manager.cpp | 10 +- src/storage/wal/wal_manager.cppm | 4 +- 23 files changed, 1304 insertions(+), 164 deletions(-) create mode 100644 conf/learner2.toml create mode 100644 python/test_cluster/common_values.py create mode 100644 python/test_cluster/test_database.py create mode 100644 python/test_cluster/test_delete.py create mode 100644 python/test_cluster/test_export.py create mode 100644 python/test_cluster/test_import.py create mode 100644 python/test_cluster/test_index.py create mode 100644 python/test_cluster/test_knn.py create mode 100644 python/test_cluster/test_select.py create mode 100644 python/test_cluster/test_table.py create mode 100644 python/test_cluster/test_update.py diff --git a/conf/learner2.toml b/conf/learner2.toml new file mode 100644 index 0000000000..aa17428b41 --- /dev/null +++ b/conf/learner2.toml @@ -0,0 +1,69 @@ +[general] +version = "0.5.0" +time_zone = "utc-8" +server_mode = "cluster" + +[network] +server_address = "0.0.0.0" +postgres_port = 5436 +http_port = 23825 +client_port = 23824 +connection_pool_size = 128 +peer_ip = "0.0.0.0" +peer_port = 23854 + +[log] +log_filename = "infinity.log" +log_dir = "/var/infinity/learner2/log" +log_to_stdout = true +log_file_max_size = "10GB" +log_file_rotate_count = 10 + +# trace/debug/info/warning/error/critical 6 log levels, default: info +log_level = "debug" + +[storage] +persistence_dir = "/var/infinity/learner2/persistence" +data_dir = "/var/infinity/learner2/data" +# periodically activates garbage collection: +# 0 means real-time, +# s means seconds, for example "60s", 60 seconds +# m means minutes, for example "60m", 60 minutes +# h means hours, for example "1h", 1 hour +optimize_interval = "10s" +cleanup_interval = "60s" +compact_interval = "120s" + +# dump memory index entry when it reachs the capacity +mem_index_capacity = 1048576 + +storage_type = "minio" + +[storage.object_storage] +url = "127.0.0.1:9000" +bucket_name = "infinity" +access_key = "minioadmin" +secret_key = "minioadmin" +enable_https = false + +[buffer] +buffer_manager_size = "4GB" +lru_num = 7 +temp_dir = "/var/infinity/learner2/tmp" + +memindex_memory_quota = "1GB" + +[wal] +wal_dir = "/var/infinity/learner2/wal" +full_checkpoint_interval = "86400s" +delta_checkpoint_interval = "60s" +# delta_checkpoint_threshold = 1000000000 +wal_compact_threshold = "1GB" + +# flush_at_once: write and flush log each commit +# only_write: write log, OS control when to flush the log, default +# flush_per_second: logs are written after each commit and flushed to disk per second. +wal_flush = "only_write" + +[resource] +resource_dir = "/var/infinity/learner2/resource" diff --git a/python/infinity_http.py b/python/infinity_http.py index bf6498d3f3..e5e52687af 100644 --- a/python/infinity_http.py +++ b/python/infinity_http.py @@ -146,6 +146,13 @@ def set_role_follower(self, node_name, leader_addr): self.net.raise_exception(r) return database_result() + def set_role_learner(self, node_name, leader_addr): + url = f"admin/node/current" + h = self.net.set_up_header(["accept", "content-type"]) + d = self.net.set_up_data([], {"role": "learner", "name": node_name, "address": leader_addr}) + r = self.net.request(url, "post", h, d) + self.net.raise_exception(r) + return database_result() # database def create_database(self, db_name, opt=ConflictType.Error): @@ -432,9 +439,9 @@ def list_indexes(self): self.net.raise_exception(r) r_json = r.json() index_list = [] - exists = r_json.get("tables", None) + exists = r_json.get("indexes", None) if exists is not None: - for t in r_json["tables"]: + for t in r_json["indexes"]: index_list.append(t) return database_result(index_list=index_list) diff --git a/python/test_cluster/common_values.py b/python/test_cluster/common_values.py new file mode 100644 index 0000000000..e3e0e77eee --- /dev/null +++ b/python/test_cluster/common_values.py @@ -0,0 +1,2 @@ +TEST_DATA_DIR = "test/data/" +TEST_TMP_DIR = "/var/infinity/test_data/" \ No newline at end of file diff --git a/python/test_cluster/conftest.py b/python/test_cluster/conftest.py index a5c896b36f..aa16e6aacb 100644 --- a/python/test_cluster/conftest.py +++ b/python/test_cluster/conftest.py @@ -21,21 +21,31 @@ def pytest_addoption(parser): action="store", default=9001, ) - parser.addoption("--infinity_dir", action="store", required=True) + parser.addoption( + "--infinity_dir", + action="store", + required=True, + help="Path to infinity directory. For local test, $pwd is ok", + ) parser.addoption("--docker", action="store_true", default=False) + def pytest_configure(config): - config.addinivalue_line("markers", "docker: mark test to run only when --docker option is provided") + config.addinivalue_line( + "markers", "docker: mark test to run only when --docker option is provided" + ) + def pytest_collection_modifyitems(config, items): if config.getoption("--docker"): - return # do not skip docker test + return # do not skip docker test skip_docker = pytest.mark.skip(reason="need --docker option to run") for item in items: if "docker" in item.keywords: print(f"skip {item.name}") item.add_marker(skip_docker) + def pytest_generate_tests(metafunc): infinity_path = metafunc.config.getoption("infinity_path") minio_dir = metafunc.config.getoption("minio_dir") @@ -51,7 +61,10 @@ def pytest_generate_tests(metafunc): if "docker_cluster" in metafunc.fixturenames: # skip if docker is in option and the testcase is marked with docker - if not metafunc.config.getoption("--docker") and "docker" in metafunc.definition.keywords: + if ( + not metafunc.config.getoption("--docker") + and "docker" in metafunc.definition.keywords + ): return print("Init DockerInfinityCluster") diff --git a/python/test_cluster/docker_infinity_cluster.py b/python/test_cluster/docker_infinity_cluster.py index 08f5933e51..c893bb6118 100644 --- a/python/test_cluster/docker_infinity_cluster.py +++ b/python/test_cluster/docker_infinity_cluster.py @@ -100,6 +100,7 @@ def uninit(self): [ "cd /infinity", f"pid=$(pgrep -f infinity || true)", + f"echo $pid", f"bash scripts/timeout_kill.sh {timeout} $pid", ] ) @@ -228,7 +229,6 @@ def add_node(self, node_name: str, config_path: str): try: container = docker_client.containers.get(container_name) - added = False except docker.errors.NotFound: container = docker_client.containers.run( image=self.image_name, @@ -241,10 +241,11 @@ def add_node(self, node_name: str, config_path: str): ], environment=[f"TZ={tz}"], ) - added = True - if added: + try: self.network.connect(container) + except docker.errors.APIError as e: + pass info = docker_client.api.inspect_network(self.network.id) # print(info) mock_ip = info["Containers"][container.id]["IPv4Address"] @@ -293,54 +294,4 @@ def __init_docker_params(self): if __name__ == "__main__": - infinity_path = "cmake-build-debug/src/infinity" - minio_dir = "minio" - minio_port = 9001 - cluster = DockerInfinityCluster( - infinity_path, minio_params=MinioParams(minio_dir, minio_port) - ) - try: - cluster.add_node("node1", "conf/leader.toml") - cluster.add_node("node2", "conf/follower.toml") - - print("init nodes") - - cluster.init_leader("node1") - cluster.init_follower("node2") - - time.sleep(1) - print("insert in node1") - - infinity1 = cluster.client("node1") - r = infinity1.list_databases() - - db1 = infinity1.get_database("default_db") - table1 = db1.create_table( - "table1", {"c1": {"type": "int"}, "c2": {"type": "vector,4,float"}} - ) - table1.insert([{"c1": 1, "c2": [1.0, 2.0, 3.0, 4.0]}]) - - res_gt = pd.DataFrame( - { - "c1": (1), - "c2": ([[1.0, 2.0, 3.0, 4.0]]), - } - ).astype({"c1": dtype("int32"), "c2": dtype("object")}) - - res = table1.output(["*"]).to_df() - pd.testing.assert_frame_equal(res, res_gt) - - time.sleep(1) - print("select in node2") - - infinity2 = cluster.client("node2") - db2 = infinity2.get_database("default_db") - table2 = db2.get_table("table1") - res = table2.output(["*"]).to_df() - pd.testing.assert_frame_equal(res, res_gt) - except Exception as e: - print(e) - cluster.clear() - raise - else: - cluster.clear() + pass diff --git a/python/test_cluster/infinity_cluster.py b/python/test_cluster/infinity_cluster.py index 534e9455ca..99d6b5688c 100644 --- a/python/test_cluster/infinity_cluster.py +++ b/python/test_cluster/infinity_cluster.py @@ -34,12 +34,16 @@ def __init__(self, minio_dir: str, minio_port: int): class BaseInfinityRunner: - def __init__(self, node_name: str, executable_path: str, config_path: str): + def __init__(self, node_name: str, executable_path: str, config_path: str, init: bool = True): self.node_name = node_name self.executable_path = executable_path self.config_path = config_path self.load_config() - self.client = None + http_ip, http_port = self.http_uri() + self.add_client(f"http://{http_ip}:{http_port}/") + if init: + print(f"Initializing node {self.node_name}") + self.init(config_path) @abstractmethod def init(self, config_path: str | None): @@ -49,33 +53,6 @@ def init(self, config_path: str | None): def uninit(self): pass - def init_as_standalone(self, config_path: str | None = None): - self.init(config_path) - http_ip, http_port = self.http_uri() - print(f"add client: http://{http_ip}:{http_port}/") - self.add_client(f"http://{http_ip}:{http_port}/") - self.__init_cmd(lambda: self.client.set_role_standalone(self.node_name)) - - def init_as_admin(self, config_path: str | None = None): - #self.init(config_path) - http_ip, http_port = self.http_uri() - self.add_client(f"http://{http_ip}:{http_port}/") - self.__init_cmd(lambda: self.client.set_role_admin()) - - def init_as_leader(self, config_path: str | None = None): - self.init(config_path) - http_ip, http_port = self.http_uri() - self.add_client(f"http://{http_ip}:{http_port}/") - self.__init_cmd(lambda: self.client.set_role_leader(self.node_name)) - - def init_as_follower(self, leader_addr: str, config_path: str | None = None): - self.init(config_path) - http_ip, http_port = self.http_uri() - self.add_client(f"http://{http_ip}:{http_port}/") - self.__init_cmd( - lambda: self.client.set_role_follower(self.node_name, leader_addr) - ) - @abstractmethod def add_client(self, http_addr: str): pass @@ -109,9 +86,11 @@ def __init_cmd(self, send_f, timeout=10): class InfinityRunner(BaseInfinityRunner): - def __init__(self, node_name: str, executable_path: str, config_path: str): - super().__init__(node_name, executable_path, config_path) + def __init__( + self, node_name: str, executable_path: str, config_path: str, init=True + ): self.process = None + super().__init__(node_name, executable_path, config_path, init) def init(self, config_path: str | None): if self.process is not None: @@ -153,17 +132,26 @@ def __init__(self, executable_path: str, *, minio_params: MinioParams): self.executable_path = executable_path self.runners: dict[str, InfinityRunner] = {} self.leader_runner: InfinityRunner | None = None - + self.leader_name = None self.add_minio(minio_params) def clear(self): - for runner in self.runners.values(): - runner.uninit() + # shutdown follower and learner first + if self.leader_runner is not None: + for runner_name in self.runners.keys(): + if runner_name != self.leader_name: + self.runners[runner_name].uninit() + self.runners[self.leader_name].uninit() + else: + for runner in self.runners.values(): + runner.uninit() + self.runners.clear() + # if self.minio_container is not None: # self.minio_container.remove(force=True, v=True) - def add_node(self, node_name: str, config_path: str): - runner = InfinityRunner(node_name, self.executable_path, config_path) + def add_node(self, node_name: str, config_path: str, init=True): + runner = InfinityRunner(node_name, self.executable_path, config_path, init) if node_name in self.runners: raise ValueError(f"Node {node_name} already exists in the cluster.") self.runners[node_name] = runner @@ -191,19 +179,25 @@ def add_minio(self, minio_params: MinioParams): network="host", ) - def init_standalone(self, node_name: str): + def set_standalone(self, node_name: str): + if self.leader_runner is not None and self.leader_name == node_name: + self.leader_name = None + self.leader_runner = None if node_name not in self.runners: raise ValueError(f"Node {node_name} not found in the runners.") runner = self.runners[node_name] - runner.init_as_standalone() + runner.client.set_role_standalone(node_name) - def init_admin(self, node_name: str): + def set_admin(self, node_name: str): + if self.leader_runner is not None and self.leader_name == node_name: + self.leader_name = None + self.leader_runner = None if node_name not in self.runners: raise ValueError(f"Node {node_name} not found in the runners.") runner = self.runners[node_name] - runner.init_as_admin() + runner.client.set_role_admin() - def init_leader(self, leader_name: str): + def set_leader(self, leader_name: str): if self.leader_runner is not None: raise ValueError( f"Leader {self.leader_runner.node_name} has already been initialized." @@ -211,17 +205,31 @@ def init_leader(self, leader_name: str): if leader_name not in self.runners: raise ValueError(f"Leader {leader_name} not found in the runners.") leader_runner = self.runners[leader_name] + self.leader_name = leader_name self.leader_runner = leader_runner - leader_runner.init_as_leader() + leader_runner.client.set_role_leader(leader_name) - def init_follower(self, follower_name: str): + def set_follower(self, follower_name: str): if follower_name not in self.runners: raise ValueError(f"Follower {follower_name} not found in the runners") if self.leader_runner is None: raise ValueError("Leader has not been initialized.") follower_runner = self.runners[follower_name] leader_ip, leader_port = self.leader_addr() - follower_runner.init_as_follower(f"{leader_ip}:{leader_port}") + follower_runner.client.set_role_follower( + follower_name, f"{leader_ip}:{leader_port}" + ) + + def set_learner(self, learner_name: str): + if learner_name not in self.runners: + raise ValueError(f"Learner {learner_name} not found in the runners") + if self.leader_runner is None: + raise ValueError("Learner has not been initialized.") + learner_runner = self.runners[learner_name] + leader_ip, leader_port = self.leader_addr() + learner_runner.client.set_role_learner( + learner_name, f"{leader_ip}:{leader_port}" + ) def client(self, node_name: str) -> infinity_http | None: if node_name not in self.runners: @@ -239,6 +247,9 @@ def remove_node(self, node_name: str): runner = self.runners[node_name] runner.uninit() del self.runners[node_name] + if self.leader_name is not None and self.leader_name == node_name: + self.leader_name = None + self.leader_runner = None if __name__ == "__main__": diff --git a/python/test_cluster/requirements.txt b/python/test_cluster/requirements.txt index 5c64b46319..c589808599 100644 --- a/python/test_cluster/requirements.txt +++ b/python/test_cluster/requirements.txt @@ -2,3 +2,4 @@ tomli~=2.0.2 psutil~=6.0.0 docker~=7.1.0 tomli-w~=1.1.0 +timeout-decorator~=0.5.0 diff --git a/python/test_cluster/test_basic.py b/python/test_cluster/test_basic.py index 6d2fcdc8e6..21b7943c01 100644 --- a/python/test_cluster/test_basic.py +++ b/python/test_cluster/test_basic.py @@ -13,7 +13,7 @@ def test_standalone(cluster: InfinityCluster): cluster.add_node("test", "conf/pytest_parallel_infinity_conf.toml") - cluster.init_standalone("test") + cluster.set_standalone("test") test_client = cluster.client("test") assert test_client is not None @@ -27,8 +27,8 @@ def test_0(cluster: InfinityCluster): cluster.add_node("node1", "conf/leader.toml") cluster.add_node("node2", "conf/follower.toml") - cluster.init_leader("node1") - cluster.init_follower("node2") + cluster.set_leader("node1") + cluster.set_follower("node2") time.sleep(1) @@ -45,8 +45,8 @@ def test_mock(mock_cluster: MockInfinityCluster): cluster.add_node("node1", "conf/leader.toml") cluster.add_node("node2", "conf/follower.toml") - cluster.init_leader("node1") - cluster.init_follower("node2") + cluster.set_leader("node1") + cluster.set_follower("node2") time.sleep(1) @@ -71,8 +71,8 @@ def test_docker(docker_cluster: DockerInfinityCluster): print("init nodes") - cluster.init_leader("node1") - cluster.init_follower("node2") + cluster.set_leader("node1") + cluster.set_follower("node2") time.sleep(1) @@ -106,6 +106,8 @@ def test_tc1(cluster: InfinityCluster): show node on n2->n1 timeout, n2 follower; create table t2 on n2; # fail n1->leader + n2->admin + n2->follower show node on n1 & n2->n1 leader, n2 follower; insert data into t1 on n1; select data t1 on n1 and n2; @@ -122,14 +124,14 @@ def test_tc1(cluster: InfinityCluster): cluster.add_node("node1", "conf/leader.toml") cluster.add_node("node2", "conf/follower.toml") - cluster.init_leader("node1") + cluster.set_leader("node1") infinity1 = cluster.client("node1") res = infinity1.show_node("node1") assert(res.node_name == "node1") assert(res.node_role == "leader") assert(res.node_status == "alive") - cluster.init_follower("node2") + cluster.set_follower("node2") infinity2 = cluster.client("node2") res = infinity1.show_node("node1") assert(res.node_name == "node1") @@ -173,7 +175,7 @@ def test_tc1(cluster: InfinityCluster): print(e) - infinity1.set_role_admin() + cluster.set_admin("node1") time.sleep(1) res = infinity2.show_node("node1") assert(res.node_status == "timeout") @@ -191,7 +193,7 @@ def test_tc1(cluster: InfinityCluster): except InfinityException as e: print(e) - infinity1.set_role_leader("node1") + cluster.set_leader("node1") time.sleep(1) res = infinity1.show_node("node1") assert(res.node_name == "node1") @@ -208,47 +210,168 @@ def test_tc1(cluster: InfinityCluster): assert(res.node_name == "node1") assert(res.node_role == "leader") assert(res.node_status == "lost connection") - # TODO: reconnect leader and check the status + # reconnect leader and check the status + cluster.set_admin("node2") + cluster.set_follower("node2") + time.sleep(1) + res = infinity1.show_node("node2") + assert(res.node_name == "node2") + assert(res.node_role == "follower") + assert(res.node_status == "alive") + res = infinity2.show_node("node1") + assert(res.node_name == "node1") + assert(res.node_role == "leader") + assert(res.node_status == "alive") + table_name = "table1" db1 = infinity1.get_database("default_db") table1 = db1.get_table(table_name) + table1.insert([{"c1": 2, "c2": [1.0, 2.0, 3.0, 4.0]}]) + res_gt = pd.DataFrame( + { + "c1": (1, 2), + "c2": ([1.0, 2.0, 3.0, 4.0], [1.0, 2.0, 3.0, 4.0]), + } + ).astype({"c1": dtype("int32"), "c2": dtype("object")}) + res = table1.output(["*"]).to_df() - print(res) + pd.testing.assert_frame_equal(res, res_gt) - res = db1.drop_table(table_name, ConflictType.Ignore) + db2 = infinity2.get_database("default_db") + table2 = db2.get_table(table_name) + res = table2.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, res_gt) + cluster.set_admin("node2") + time.sleep(1) + try: + infinity1.show_node("node2") + except InfinityException as e: + print(e) + assert(e.error_code == 7019) # Not found node2 + try: + table2.insert([{"c1": 1, "c2": [1.0, 2.0, 3.0, 4.0]}]) + except InfinityException as e: + print(e) + + cluster.set_follower("node2") + time.sleep(1) + res = infinity1.show_node("node2") + assert(res.node_name == "node2") + assert(res.node_role == "follower") + assert(res.node_status == "alive") + res = infinity2.show_node("node1") + assert(res.node_name == "node1") + assert(res.node_role == "leader") + assert(res.node_status == "alive") + + res = table1.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, res_gt) + res = table2.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, res_gt) + + res = db1.drop_table(table_name, ConflictType.Ignore) assert(res.error_code == ErrorCode.OK) - #table1.insert([{"c1": 2, "c2": [1.0, 2.0, 3.0, 4.0]}]) - #res_gt = pd.DataFrame( - # { - # "c1": (1, 2), - # "c2": ([[1.0, 2.0, 3.0, 4.0]], [[1.0, 2.0, 3.0, 4.0]]), - # } - #).astype({"c1": dtype("int32"), "c2": dtype("object")}) - - #res = table1.output(["*"]).to_df() - #pd.testing.assert_frame_equal(res, res_gt) - #res = db1.drop_table(table_name) - #assert res.error_code == ErrorCode.OK + + try: + db1.show_table(table_name) + except InfinityException as e: + print(e) + + try: + db2.show_table(table_name) + except InfinityException as e: + print(e) time.sleep(1) cluster.remove_node("node2") cluster.remove_node("node1") cluster.clear() +def test_tc2(cluster: InfinityCluster): + ''' + tc2: + n1: admin + n2: admin + n3: admin + n4: admin + n1->leader + create table t1 on n1; + insert several rows data; + n2->follower + n3->learner + n4->learner + show nodes on n1 & n2 & n3 & n4 + ''' + cluster.add_node("node1", "conf/leader.toml") + cluster.add_node("node2", "conf/follower.toml") + cluster.add_node("node3", "conf/learner.toml") + cluster.add_node("node4", "conf/learner2.toml") + cluster.set_admin("node1") + cluster.set_admin("node2") + cluster.set_admin("node3") + cluster.set_admin("node4") + + cluster.set_leader("node1") + infinity1 = cluster.client("node1") + db1 = infinity1.get_database("default_db") + table_name = "table1_tc2" + db1.drop_table(table_name, ConflictType.Ignore) + table1 = db1.create_table( + table_name, {"c1": {"type": "int"}, "c2": {"type": "vector,4,float"}} + ) + for i in range(10): + table1.insert([{"c1": i, "c2": [1.0, 2.0, 3.0, 4.0]}]) + + res_gt = pd.DataFrame( + { + "c1": (0, 1, 2, 3, 4, 5, 6, 7, 8, 9), + "c2": ([1.0, 2.0, 3.0, 4.0] for _ in range(10)), + } + ).astype({"c1": dtype("int32"), "c2": dtype("object")}) + + infinity2 = cluster.client("node2") + cluster.set_follower("node2") + + infinity3 = cluster.client("node3") + cluster.set_learner("node3") + + infinity4 = cluster.client("node4") + cluster.set_learner("node4") + + time.sleep(1) + for server in [infinity1, infinity2, infinity3, infinity4]: + res = server.show_node("node1") + assert(res.node_name == "node1") + assert(res.node_role == "leader") + assert(res.node_status == "alive") + res = server.show_node("node2") + assert(res.node_name == "node2") + assert(res.node_role == "follower") + assert(res.node_status == "alive") + res = server.show_node("node3") + assert(res.node_name == "node3") + assert(res.node_role == "learner") + assert(res.node_status == "alive") + res = server.show_node("node4") + assert(res.node_name == "node4") + assert(res.node_role == "learner") + assert(res.node_status == "alive") + + for server in [infinity1, infinity2, infinity3, infinity4]: + db = server.get_database("default_db") + table = db.get_table(table_name) + res = table.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, res_gt) + + res = db1.drop_table(table_name) + assert(res.error_code == ErrorCode.OK) + + time.sleep(1) + cluster.remove_node("node4") + cluster.remove_node("node3") + cluster.remove_node("node2") + cluster.remove_node("node1") + cluster.clear() -''' -tc2: -n1: admin -n2: admin -n3: admin -n4: admin -n1->leader -create table t1 on n1; -insert several rows data; -n2->follower -n3->learner -n4->learner -show nodes on n1 & n2 & n3 & n4 -''' diff --git a/python/test_cluster/test_database.py b/python/test_cluster/test_database.py new file mode 100644 index 0000000000..3883cdf304 --- /dev/null +++ b/python/test_cluster/test_database.py @@ -0,0 +1,100 @@ +import time + +from numpy import dtype +import pandas as pd +import pytest +from infinity_cluster import InfinityCluster +from infinity.common import ConflictType +from infinity.common import InfinityException + + +class TestDatabase: + def test_create_100_db(self, cluster: InfinityCluster): + try: + cluster.add_node("node1", "conf/leader.toml") + cluster.add_node("node2", "conf/follower.toml") + + print("init nodes") + cluster.set_leader("node1") + cluster.set_follower("node2") + time.sleep(1) + + infinity1 = cluster.client("node1") + infinity2 = cluster.client("node2") + + db_count = 100 + for i in range(db_count): + print('drop test_cluster_db_name' + str(i)) + infinity1.drop_database('test_cluster_db_name' + str(i), ConflictType.Ignore) + for i in range(db_count): + print('create test_cluster_db_name' + str(i)) + infinity1.create_database('test_cluster_db_name' + str(i)) + + time.sleep(1) + dbs = infinity1.list_databases() + res_dbs = [] + for db_name in dbs.db_names: + print('db name: ' + db_name) + if db_name.startswith("test_cluster_db_name") or db_name == "default_db": + res_dbs.append(db_name) + assert len(res_dbs) == (db_count + 1) + + dbs = infinity2.list_databases() + res_dbs = [] + for db_name in dbs.db_names: + print('db name: ' + db_name) + if db_name.startswith("test_cluster_db_name") or db_name == "default_db": + res_dbs.append(db_name) + assert len(res_dbs) == (db_count + 1) + + for i in range(db_count): + print('drop test_cluster_db_name' + str(i)) + infinity1.drop_database('test_cluster_db_name' + str(i), ConflictType.Ignore) + + time.sleep(1) + dbs = infinity1.list_databases() + res_dbs = [] + for db_name in dbs.db_names: + print('db name: ' + db_name) + if db_name.startswith("test_cluster_db_name") or db_name == "default_db": + res_dbs.append(db_name) + assert len(res_dbs) == (1) + + dbs = infinity2.list_databases() + res_dbs = [] + for db_name in dbs.db_names: + print('db name: ' + db_name) + if db_name.startswith("test_cluster_db_name") or db_name == "default_db": + res_dbs.append(db_name) + assert len(res_dbs) == (1) + except Exception as e: + print(e) + cluster.clear() + raise + else: + cluster.clear() + + def test_create_database_on_follower(self, cluster: InfinityCluster): + try: + cluster.add_node("node1", "conf/leader.toml") + cluster.add_node("node2", "conf/follower.toml") + + print("init nodes") + cluster.set_leader("node1") + cluster.set_follower("node2") + time.sleep(1) + + infinity1 = cluster.client("node1") + infinity2 = cluster.client("node2") + + try: + infinity2.create_database("test_cluster_follower_db") + except InfinityException as e: + print(e) + assert(e.error_code == 8007) + except Exception as e: + print(e) + cluster.clear() + raise + else: + cluster.clear() \ No newline at end of file diff --git a/python/test_cluster/test_delete.py b/python/test_cluster/test_delete.py new file mode 100644 index 0000000000..32490a34cb --- /dev/null +++ b/python/test_cluster/test_delete.py @@ -0,0 +1,132 @@ +import time + +from numpy import dtype +import pandas as pd +import pytest +from infinity_cluster import InfinityCluster +from infinity.common import ConflictType +from infinity.common import InfinityException +from infinity.errors import ErrorCode + + +class TestDelete: + def test_delete(self, cluster: InfinityCluster): + try: + cluster.add_node("node1", "conf/leader.toml") + cluster.add_node("node2", "conf/follower.toml") + + print("init nodes") + cluster.set_leader("node1") + cluster.set_follower("node2") + time.sleep(1) + + infinity1 = cluster.client("node1") + infinity2 = cluster.client("node2") + + db_obj = infinity1.get_database("default_db") + + db_obj.drop_table(table_name="test_delete", conflict_type=ConflictType.Ignore) + res = db_obj.create_table( + "test_delete", + {"c1": {"type": "int", "constraints": ["primary key", "not null"]}, + "c2": {"type": "int"}, "c3": {"type": "int"}}, ConflictType.Error) + + table_obj = db_obj.get_table("test_delete") + + res = table_obj.insert( + [{"c1": 1, "c2": 10, "c3": 100}, {"c1": 2, "c2": 20, "c3": 200}, {"c1": 3, "c2": 30, "c3": 300}, + {"c1": 4, "c2": 40, "c3": 400}]) + assert res.error_code == ErrorCode.OK + + res = table_obj.delete("c1 = 1") + assert res.error_code == ErrorCode.OK + + res = table_obj.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, pd.DataFrame({'c1': (2, 3, 4), 'c2': (20, 30, 40), 'c3': (200, 300, 400)}) + .astype({'c1': dtype('int32'), 'c2': dtype('int32'), 'c3': dtype('int32')})) + + time.sleep(1) + db_obj_2 = infinity2.get_database("default_db") + table_obj_2 = db_obj_2.get_table("test_delete") + res = table_obj_2.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, pd.DataFrame({'c1': (2, 3, 4), 'c2': (20, 30, 40), 'c3': (200, 300, 400)}) + .astype({'c1': dtype('int32'), 'c2': dtype('int32'), 'c3': dtype('int32')})) + + res = table_obj.delete() + assert res.error_code == ErrorCode.OK + + res = table_obj.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, pd.DataFrame({'c1': (), 'c2': (), 'c3': ()}) + .astype({'c1': dtype('int32'), 'c2': dtype('int32'), 'c3': dtype('int32')})) + + res = table_obj_2.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, pd.DataFrame({'c1': (), 'c2': (), 'c3': ()}) + .astype({'c1': dtype('int32'), 'c2': dtype('int32'), 'c3': dtype('int32')})) + + res = db_obj.drop_table("test_delete") + assert res.error_code == ErrorCode.OK + + + except Exception as e: + print(e) + cluster.clear() + raise + else: + cluster.clear() + + def test_delete_on_follower(self, cluster: InfinityCluster): + try: + cluster.add_node("node1", "conf/leader.toml") + cluster.add_node("node2", "conf/follower.toml") + + print("init nodes") + cluster.set_leader("node1") + cluster.set_follower("node2") + time.sleep(1) + + infinity1 = cluster.client("node1") + infinity2 = cluster.client("node2") + + db_obj = infinity1.get_database("default_db") + + db_obj.drop_table(table_name="test_delete", conflict_type=ConflictType.Ignore) + res = db_obj.create_table( + "test_delete", + {"c1": {"type": "int", "constraints": ["primary key", "not null"]}, + "c2": {"type": "int"}, "c3": {"type": "int"}}, ConflictType.Error) + + table_obj = db_obj.get_table("test_delete") + + res = table_obj.insert( + [{"c1": 1, "c2": 10, "c3": 100}, {"c1": 2, "c2": 20, "c3": 200}, {"c1": 3, "c2": 30, "c3": 300}, + {"c1": 4, "c2": 40, "c3": 400}]) + assert res.error_code == ErrorCode.OK + + time.sleep(1) + db_obj_2 = infinity2.get_database("default_db") + table_obj_2 = db_obj_2.get_table("test_delete") + + try: + table_obj_2.delete("c1 = 1") + except InfinityException as e: + print(e) + assert(e.error_code == 8007) + + res = table_obj.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, pd.DataFrame({'c1': (1, 2, 3, 4), 'c2': (10, 20, 30, 40), 'c3': (100, 200, 300, 400)}) + .astype({'c1': dtype('int32'), 'c2': dtype('int32'), 'c3': dtype('int32')})) + + res = table_obj_2.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, pd.DataFrame({'c1': (1, 2, 3, 4), 'c2': (10, 20, 30, 40), 'c3': (100, 200, 300, 400)}) + .astype({'c1': dtype('int32'), 'c2': dtype('int32'), 'c3': dtype('int32')})) + + res = db_obj.drop_table("test_delete") + assert res.error_code == ErrorCode.OK + + + except Exception as e: + print(e) + cluster.clear() + raise + else: + cluster.clear() \ No newline at end of file diff --git a/python/test_cluster/test_export.py b/python/test_cluster/test_export.py new file mode 100644 index 0000000000..de6b0d40cb --- /dev/null +++ b/python/test_cluster/test_export.py @@ -0,0 +1,70 @@ +import time +import os +from numpy import dtype +import pandas as pd +import pytest +from infinity_cluster import InfinityCluster +from infinity.common import ConflictType +from infinity.common import InfinityException +from infinity.errors import ErrorCode +import common_values + +def count_lines(file_path: str): + with open(file_path, 'r') as file: + lines = file.readlines() + return len(lines) + +def delete_file(file_path: str): + if os.path.exists(file_path): + os.remove(file_path) + +class TestExport: + def test_export_csv(self, cluster: InfinityCluster): + try: + cluster.add_node("node1", "conf/leader.toml") + cluster.add_node("node2", "conf/follower.toml") + + print("init nodes") + cluster.set_leader("node1") + cluster.set_follower("node2") + time.sleep(1) + + infinity1 = cluster.client("node1") + infinity2 = cluster.client("node2") + + db_obj = infinity1.get_database("default_db") + + test_csv_dir = common_values.TEST_DATA_DIR + "csv/enwiki_embedding_9999.csv" + print(f"import file: {test_csv_dir}") + assert os.path.exists(test_csv_dir) + if not os.path.exists(common_values.TEST_TMP_DIR): + os.makedirs(common_values.TEST_TMP_DIR) + + db_obj.drop_table("test_export_csv", ConflictType.Ignore) + table_obj = db_obj.create_table("test_export_csv", {"doctitle": {"type": "varchar"}, "docdate": {"type": "varchar"}, "body": {"type": "varchar"}, "num": {"type": "integer"}, "vec": {"type": "vector, 4, float"}}) + res = table_obj.import_data(test_csv_dir, import_options={"file_type": "csv", "delimiter" : "\t"}) + assert res.error_code == ErrorCode.OK + test_export_csv_file_path = common_values.TEST_TMP_DIR + "leader_" +"test_export_csv.csv" + res = table_obj.export_data(test_export_csv_file_path, {"file_type": "csv", "delimiter" : "\t"}) + assert res.error_code == ErrorCode.OK + assert count_lines(test_export_csv_file_path) == 9999 + delete_file(test_export_csv_file_path) + + time.sleep(1) + db_obj_2 = infinity2.get_database("default_db") + table_obj_2 = db_obj_2.get_table("test_export_csv") + test_export_csv_file_path = common_values.TEST_TMP_DIR + "follower_" +"test_export_csv.csv" + res = table_obj_2.export_data(test_export_csv_file_path, {"file_type": "csv", "delimiter" : "\t"}) + assert res.error_code == ErrorCode.OK + assert count_lines(test_export_csv_file_path) == 9999 + delete_file(test_export_csv_file_path) + + res = db_obj.drop_table("test_export_csv", ConflictType.Error) + assert res.error_code == ErrorCode.OK + + except Exception as e: + print(e) + cluster.clear() + raise + else: + cluster.clear() \ No newline at end of file diff --git a/python/test_cluster/test_import.py b/python/test_cluster/test_import.py new file mode 100644 index 0000000000..1419eb33f6 --- /dev/null +++ b/python/test_cluster/test_import.py @@ -0,0 +1,67 @@ +import os +import time + +from numpy import dtype +import pandas as pd +import pytest +from infinity_cluster import InfinityCluster +from infinity.common import ConflictType +import common_values + + +class TestImport: + def test1(self, cluster: InfinityCluster): + try: + cluster.add_node("node1", "conf/leader.toml") + cluster.add_node("node2", "conf/follower.toml") + + print("init nodes") + + cluster.set_leader("node1") + cluster.set_follower("node2") + + time.sleep(1) + print("import in node1") + table_name = "test_import1" + + infinity1 = cluster.client("node1") + db_obj1 = infinity1.get_database("default_db") + db_obj1.drop_table(table_name, ConflictType.Ignore) + table_obj1 = db_obj1.create_table( + table_name, {"c1": {"type": "int"}, "c2": {"type": "vector,3,int"}} + ) + + test_csv_dir = common_values.TEST_DATA_DIR + "csv/embedding_int_dim3.csv" + print(f"import file: {test_csv_dir}") + assert os.path.exists(test_csv_dir) + + res = table_obj1.import_data(test_csv_dir) + assert res.error_code == 0 + + res_gt = pd.DataFrame( + { + "c1": (1, 5, 9), + "c2": ([2, 3, 4], [6, 7, 8], [10, 11, 12]), + } + ).astype({"c1": dtype("int32"), "c2": dtype("object")}) + + res = table_obj1.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, res_gt) + + time.sleep(1) + print("select in node2") + + infinity2 = cluster.client("node2") + db_obj2 = infinity2.get_database("default_db") + table_obj2 = db_obj2.get_table(table_name) + res = table_obj2.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, res_gt) + + db_obj1.drop_table(table_name) + + except Exception as e: + print(e) + cluster.clear() + raise + else: + cluster.clear() diff --git a/python/test_cluster/test_index.py b/python/test_cluster/test_index.py new file mode 100644 index 0000000000..0b67fc5e09 --- /dev/null +++ b/python/test_cluster/test_index.py @@ -0,0 +1,87 @@ +import time + +from numpy import dtype +import pandas as pd +import pytest +from infinity_cluster import InfinityCluster +from infinity.common import ConflictType +import infinity.index as index + + +class TestIndex: + def test1(self, cluster: InfinityCluster): + try: + cluster.add_node("node1", "conf/leader.toml") + cluster.add_node("node2", "conf/follower.toml") + + print("init nodes") + cluster.set_leader("node1") + cluster.set_follower("node2") + time.sleep(1) + + table_name = "test_index1" + index_name = "idx1" + + print("create index in node1") + infinity1 = cluster.client("node1") + db_obj1 = infinity1.get_database("default_db") + db_obj1.drop_table(table_name, ConflictType.Ignore) + + table_obj1 = db_obj1.create_table( + table_name, {"c1": {"type": "int"}, "c2": {"type": "varchar"}} + ) + table_obj1.create_index( + index_name, index.IndexInfo("c1", index.IndexType.Secondary) + ) + index_list1 = table_obj1.list_indexes().index_list + # print(index_list1) + assert len(index_list1) == 1 and index_list1[0]["index_name"] == index_name + + time.sleep(1) + + print("check index in node2") + infinity2 = cluster.client("node2") + db_obj2 = infinity2.get_database("default_db") + table_obj2 = db_obj2.get_table(table_name) + index_list2 = table_obj2.list_indexes().index_list + # print(index_list2) + assert len(index_list2) == 1 and index_list2[0]["index_name"] == index_name + + print("insert in node1") + table_obj1.insert( + [ + {"c1": 1, "c2": "text1"}, + {"c1": 2, "c2": "text2"}, + {"c1": 3, "c2": "text3"}, + ] + ) + res_gt = pd.DataFrame( + { + "c1": (1, 2), + "c2": ("text1", "text2"), + } + ).astype({"c1": dtype("int32"), "c2": dtype("object")}) + res1 = table_obj1.output(["*"]).filter("c1 < 3").to_df() + pd.testing.assert_frame_equal(res1, res_gt) + + print("select in node2") + time.sleep(1) + res2 = table_obj2.output(["*"]).filter("c1 < 3").to_df() + # print(res2) + pd.testing.assert_frame_equal(res2, res_gt) + + print("drop index in node1 and check in node2") + table_obj1.drop_index(index_name) + time.sleep(1) + index_list2 = table_obj2.list_indexes().index_list + assert len(index_list2) == 0 + + print("uninit cluster") + db_obj1.drop_table(table_name) + + except Exception as e: + print(e) + cluster.clear() + raise + else: + cluster.clear() diff --git a/python/test_cluster/test_insert.py b/python/test_cluster/test_insert.py index ce6d29ec0d..ae03e4add2 100644 --- a/python/test_cluster/test_insert.py +++ b/python/test_cluster/test_insert.py @@ -6,6 +6,7 @@ import time from infinity.errors import ErrorCode from infinity.common import ConflictType +import timeout_decorator class TestInsert: @@ -16,8 +17,8 @@ def __test_inner_1(self, cluster: InfinityCluster): print("init nodes") - cluster.init_leader("node1") - cluster.init_follower("node2") + cluster.set_leader("node1") + cluster.set_follower("node2") time.sleep(1) print("insert in node1") @@ -67,3 +68,142 @@ def test_insert_11(self, cluster: InfinityCluster): @pytest.mark.docker def test_insert_12(self, docker_cluster: DockerInfinityCluster): self.__test_inner_1(docker_cluster) + + # read/write when leader/follower is disconnected + @pytest.mark.docker + def test_insert_2(self, docker_cluster: DockerInfinityCluster): + try: + docker_cluster.add_node("node1", "conf/leader.toml") + docker_cluster.add_node("node2", "conf/follower.toml") + print("init nodes") + time.sleep(1) + docker_cluster.set_leader("node1") + docker_cluster.set_follower("node2") + + time.sleep(1) + print("insert in node1") + + infinity1 = docker_cluster.client("node1") + + table_name = "table2" + db1 = infinity1.get_database("default_db") + db1.drop_table(table_name, ConflictType.Ignore) + table1 = db1.create_table( + table_name, {"c1": {"type": "int"}, "c2": {"type": "vector,4,float"}} + ) + table1.insert([{"c1": 1, "c2": [1.0, 2.0, 3.0, 4.0]}]) + + res_gt = pd.DataFrame( + { + "c1": (1), + "c2": ([[1.0, 2.0, 3.0, 4.0]]), + } + ).astype({"c1": dtype("int32"), "c2": dtype("object")}) + + time.sleep(1) + print("select in node2") + + infinity2 = docker_cluster.client("node2") + db2 = infinity2.get_database("default_db") + table2 = db2.get_table(table_name) + res = table2.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, res_gt) + + docker_cluster.disconnect("node2") + try: + + @timeout_decorator.timeout(1) + def noreturn_request(): + table1.insert([{"c1": 2, "c2": [5.0, 6.0, 7.0, 8.0]}]) + + noreturn_request() + + except Exception as e: + pass + docker_cluster.reconnect("node2") + + docker_cluster.disconnect("node1") + res = table2.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, res_gt) + docker_cluster.reconnect("node1") + + db1.drop_table(table_name) + except Exception as e: + print(e) + docker_cluster.clear() + raise + else: + docker_cluster.clear() + + @pytest.mark.skip("Bug") + @pytest.mark.docker + def test_insert_3(self, docker_cluster: DockerInfinityCluster): + try: + docker_cluster.add_node("node1", "conf/leader.toml") + docker_cluster.add_node("node2", "conf/follower.toml") + print("init nodes") + time.sleep(1) + docker_cluster.set_leader("node1") + docker_cluster.set_follower("node2") + + time.sleep(1) + print("insert in node1") + + infinity1 = docker_cluster.client("node1") + + table_name = "table2" + db1 = infinity1.get_database("default_db") + db1.drop_table(table_name, ConflictType.Ignore) + table1 = db1.create_table( + table_name, {"c1": {"type": "int"}, "c2": {"type": "vector,4,float"}} + ) + table1.insert([{"c1": 1, "c2": [1.0, 2.0, 3.0, 4.0]}]) + time.sleep(1) + + # reconnect follower + docker_cluster.remove_node("node2") + docker_cluster.add_node("node2", "conf/follower.toml") + time.sleep(1) + docker_cluster.set_follower("node2") + + table1.insert([{"c1": 2, "c2": [5.0, 6.0, 7.0, 8.0]}]) + res_gt = pd.DataFrame( + { + "c1": (1, 2), + "c2": ([[1.0, 2.0, 3.0, 4.0]], [[5.0, 6.0, 7.0, 8.0]]), + } + ).astype({"c1": dtype("int32"), "c2": dtype("object")}) + + infinity2 = docker_cluster.client("node2") + db2 = infinity2.get_database("default_db") + table2 = db2.get_table(table_name) + res = table2.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, res_gt) + + # reconnect leader + docker_cluster.remove_node("node1") + docker_cluster.add_node("node1", "conf/leader.toml") + time.sleep(1) + docker_cluster.set_leader("node1") + + table1.insert([{"c1": 3, "c2": [9.0, 10.0, 11.0, 12.0]}]) + res_gt = pd.DataFrame( + { + "c1": (1, 2, 3), + "c2": ( + [[1.0, 2.0, 3.0, 4.0]], + [[5.0, 6.0, 7.0, 8.0]], + [[9.0, 10.0, 11.0, 12.0]], + ), + } + ).astype({"c1": dtype("int32"), "c2": dtype("object")}) + res = table2.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, res_gt) + + db1.drop_table(table_name) + except Exception as e: + print(e) + docker_cluster.clear() + raise + else: + docker_cluster.clear() diff --git a/python/test_cluster/test_knn.py b/python/test_cluster/test_knn.py new file mode 100644 index 0000000000..64f6806607 --- /dev/null +++ b/python/test_cluster/test_knn.py @@ -0,0 +1,67 @@ +import time +import os +from numpy import dtype +import pandas as pd +import pytest +from infinity_cluster import InfinityCluster +from infinity.common import ConflictType +from infinity.common import InfinityException +from infinity.errors import ErrorCode +import common_values + +class TestKnn: + def test_knn(self, cluster: InfinityCluster): + try: + cluster.add_node("node1", "conf/leader.toml") + cluster.add_node("node2", "conf/follower.toml") + + print("init nodes") + cluster.set_leader("node1") + cluster.set_follower("node2") + time.sleep(1) + + infinity1 = cluster.client("node1") + infinity2 = cluster.client("node2") + + db_obj = infinity1.get_database("default_db") + + test_csv_dir = common_values.TEST_DATA_DIR + "csv/tmp_20240116.csv" + print(f"import file: {test_csv_dir}") + assert os.path.exists(test_csv_dir) + if not os.path.exists(common_values.TEST_TMP_DIR): + os.makedirs(common_values.TEST_TMP_DIR) + + db_obj.drop_table("test_knn", ConflictType.Ignore) + table_obj = db_obj.create_table("test_knn", { + "variant_id": {"type": "varchar"}, + "gender_vector": {"type": "vector,4,float"}, + "color_vector": {"type": "vector,4,float"}, + "category_vector": {"type": "vector,4,float"}, + "tag_vector": {"type": "vector,4,float"}, + "other_vector": {"type": "vector,4,float"}, + "query_is_recommend": {"type": "varchar"}, + "query_gender": {"type": "varchar"}, + "query_color": {"type": "varchar"}, + "query_price": {"type": "float"} + }, ConflictType.Error) + res = table_obj.import_data(test_csv_dir, None) + assert res.error_code == ErrorCode.OK + + res = table_obj.output(["variant_id", "_row_id"]).match_dense("gender_vector", [1.0] * 4, "float", "ip", 10).to_pl() + print(res) + + time.sleep(1) + db_obj_2 = infinity2.get_database("default_db") + table_obj_2 = db_obj_2.get_table("test_knn") + res = table_obj_2.output(["variant_id", "_row_id"]).match_dense("gender_vector", [1.0] * 4, "float", "ip", 10).to_pl() + print(res) + + res = db_obj.drop_table("test_knn", ConflictType.Error) + assert res.error_code == ErrorCode.OK + + except Exception as e: + print(e) + cluster.clear() + raise + else: + cluster.clear() \ No newline at end of file diff --git a/python/test_cluster/test_select.py b/python/test_cluster/test_select.py new file mode 100644 index 0000000000..4f56f44ef7 --- /dev/null +++ b/python/test_cluster/test_select.py @@ -0,0 +1,86 @@ +import time +import os +from numpy import dtype +import pandas as pd +import pytest +from infinity_cluster import InfinityCluster +from infinity.common import ConflictType +from infinity.common import InfinityException +from infinity.errors import ErrorCode +import common_values + +class TestSelect: + def test_select(self, cluster: InfinityCluster): + try: + cluster.add_node("node1", "conf/leader.toml") + cluster.add_node("node2", "conf/follower.toml") + + print("init nodes") + cluster.set_leader("node1") + cluster.set_follower("node2") + time.sleep(1) + + infinity1 = cluster.client("node1") + infinity2 = cluster.client("node2") + + db_obj = infinity1.get_database("default_db") + + db_obj.drop_table("test_select", ConflictType.Ignore) + table_obj = db_obj.create_table( + "test_select", { + "c1": {"type": "int", "constraints": ["primary key", "not null"]}, + "c2": {"type": "int", "constraints": ["not null"]}}, ConflictType.Error) + + res = table_obj.insert( + [{"c1": -3, "c2": -3}, {"c1": -2, "c2": -2}, {"c1": -1, "c2": -1}, {"c1": 0, "c2": 0}, {"c1": 1, "c2": 1}, + {"c1": 2, "c2": 2}, {"c1": 3, "c2": 3}]) + assert res.error_code == ErrorCode.OK + + res = table_obj.insert( + [{"c1": -8, "c2": -8}, {"c1": -7, "c2": -7}, {"c1": -6, "c2": -6}, {"c1": 7, "c2": 7}, {"c1": 8, "c2": 8}, + {"c1": 9, "c2": 9}]) + assert res.error_code == ErrorCode.OK + + + res = table_obj.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, pd.DataFrame({'c1': (-3, -2, -1, 0, 1, 2, 3, -8, -7, -6, 7, 8, 9), + 'c2': (-3, -2, -1, 0, 1, 2, 3, -8, -7, -6, 7, 8, 9)}) + .astype({'c1': dtype('int32'), 'c2': dtype('int32')})) + + res = table_obj.output(["c1", "c2"]).to_df() + pd.testing.assert_frame_equal(res, pd.DataFrame({'c1': (-3, -2, -1, 0, 1, 2, 3, -8, -7, -6, 7, 8, 9), + 'c2': (-3, -2, -1, 0, 1, 2, 3, -8, -7, -6, 7, 8, 9)}) + .astype({'c1': dtype('int32'), 'c2': dtype('int32')})) + + res = table_obj.output( + ["c1 + c2"]).filter("c1 = 3").to_df() + pd.testing.assert_frame_equal(res, pd.DataFrame({'(c1 + c2)': (6,)}) + .astype({'(c1 + c2)': dtype('int32')})) + + time.sleep(1) + db_obj_2 = infinity2.get_database("default_db") + table_obj_2 = db_obj_2.get_table("test_select") + res = table_obj_2.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, pd.DataFrame({'c1': (-3, -2, -1, 0, 1, 2, 3, -8, -7, -6, 7, 8, 9), + 'c2': (-3, -2, -1, 0, 1, 2, 3, -8, -7, -6, 7, 8, 9)}) + .astype({'c1': dtype('int32'), 'c2': dtype('int32')})) + + res = table_obj_2.output(["c1", "c2"]).to_df() + pd.testing.assert_frame_equal(res, pd.DataFrame({'c1': (-3, -2, -1, 0, 1, 2, 3, -8, -7, -6, 7, 8, 9), + 'c2': (-3, -2, -1, 0, 1, 2, 3, -8, -7, -6, 7, 8, 9)}) + .astype({'c1': dtype('int32'), 'c2': dtype('int32')})) + + res = table_obj_2.output( + ["c1 + c2"]).filter("c1 = 3").to_df() + pd.testing.assert_frame_equal(res, pd.DataFrame({'(c1 + c2)': (6,)}) + .astype({'(c1 + c2)': dtype('int32')})) + + res = db_obj.drop_table("test_select", ConflictType.Error) + assert res.error_code == ErrorCode.OK + + except Exception as e: + print(e) + cluster.clear() + raise + else: + cluster.clear() \ No newline at end of file diff --git a/python/test_cluster/test_table.py b/python/test_cluster/test_table.py new file mode 100644 index 0000000000..b6ab6613c1 --- /dev/null +++ b/python/test_cluster/test_table.py @@ -0,0 +1,103 @@ +import time + +from numpy import dtype +import pandas as pd +import pytest +from infinity_cluster import InfinityCluster +from infinity.common import ConflictType +from infinity.common import InfinityException + + +class TestTable: + def test_create_100_table(self, cluster: InfinityCluster): + try: + cluster.add_node("node1", "conf/leader.toml") + cluster.add_node("node2", "conf/follower.toml") + + print("init nodes") + cluster.set_leader("node1") + cluster.set_follower("node2") + time.sleep(1) + + infinity1 = cluster.client("node1") + infinity2 = cluster.client("node2") + + db1 = infinity1.get_database("default_db") + table_count = 100 + for i in range(table_count): + print('drop test_cluster_table_name' + str(i)) + db1.drop_table('test_cluster_table_name' + str(i), ConflictType.Ignore) + for i in range(table_count): + print('create test_cluster_table_name' + str(i)) + db1.create_table('test_cluster_table_name' + str(i), {"c1": {"type": "int", "constraints": ["primary key"]}, "c2": {"type": "float"}}, + ConflictType.Error) + + time.sleep(1) + tables = db1.get_all_tables() + res_tables = [] + for table_name in tables: + if table_name.startswith("test_cluster_table_name") : + res_tables.append(table_name) + assert len(res_tables) == (table_count) + + db2 = infinity2.get_database("default_db") + tables = db2.get_all_tables() + res_tables = [] + for table_name in tables: + if table_name.startswith("test_cluster_table_name") : + res_tables.append(table_name) + assert len(res_tables) == (table_count) + + for i in range(table_count): + print('drop test_cluster_table_name' + str(i)) + db1.drop_table('test_cluster_table_name' + str(i), ConflictType.Ignore) + + time.sleep(1) + tables = db1.get_all_tables() + res_tables = [] + for table_name in tables: + if table_name.startswith("test_cluster_table_name") : + res_tables.append(table_name) + assert len(res_tables) == 0 + + db2 = infinity2.get_database("default_db") + tables = db2.get_all_tables() + res_tables = [] + for table_name in tables: + if table_name.startswith("test_cluster_table_name") : + res_tables.append(table_name) + assert len(res_tables) == 0 + except Exception as e: + print(e) + cluster.clear() + raise + else: + cluster.clear() + + def test_create_table_on_follower(self, cluster: InfinityCluster): + try: + cluster.add_node("node1", "conf/leader.toml") + cluster.add_node("node2", "conf/follower.toml") + + print("init nodes") + cluster.set_leader("node1") + cluster.set_follower("node2") + time.sleep(1) + + infinity1 = cluster.client("node1") + infinity2 = cluster.client("node2") + + db = infinity2.get_database("default_db") + try: + db.create_table('test_cluster_follwer_table', {"c1": {"type": "int", "constraints": ["primary key"]}, "c2": {"type": "float"}}, + ConflictType.Error) + except InfinityException as e: + print(e) + assert(e.error_code == 8007) + + except Exception as e: + print(e) + cluster.clear() + raise + else: + cluster.clear() \ No newline at end of file diff --git a/python/test_cluster/test_update.py b/python/test_cluster/test_update.py new file mode 100644 index 0000000000..a049893bba --- /dev/null +++ b/python/test_cluster/test_update.py @@ -0,0 +1,110 @@ +import time +import os +from numpy import dtype +import pandas as pd +import pytest +from infinity_cluster import InfinityCluster +from infinity.common import ConflictType +from infinity.common import InfinityException +from infinity.errors import ErrorCode +import common_values + +class TestUpdate: + def test_update(self, cluster: InfinityCluster): + try: + cluster.add_node("node1", "conf/leader.toml") + cluster.add_node("node2", "conf/follower.toml") + + print("init nodes") + cluster.set_leader("node1") + cluster.set_follower("node2") + time.sleep(1) + + infinity1 = cluster.client("node1") + infinity2 = cluster.client("node2") + + db_obj = infinity1.get_database("default_db") + + db_obj.drop_table(table_name="test_update", conflict_type=ConflictType.Ignore) + + table_obj = db_obj.create_table( + "test_update", {"c1": {"type": "int", "constraints": ["primary key", "not null"]}, + "c2": {"type": "int"}, "c3": {"type": "int"}}, ConflictType.Error) + assert table_obj is not None + + res = table_obj.insert( + [{"c1": 1, "c2": 10, "c3": 100}, {"c1": 2, "c2": 20, "c3": 200}, {"c1": 3, "c2": 30, "c3": 300}, + {"c1": 4, "c2": 40, "c3": 400}]) + assert res.error_code == ErrorCode.OK + + res = table_obj.update("c1 = 1", {"c2": 90, "c3": 900}) + assert res.error_code == ErrorCode.OK + + res = table_obj.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, pd.DataFrame( + {'c1': (2, 3, 4, 1), 'c2': (20, 30, 40, 90), 'c3': (200, 300, 400, 900)}) + .astype({'c1': dtype('int32'), 'c2': dtype('int32'), 'c3': dtype('int32')})) + + time.sleep(1) + db_obj_2 = infinity2.get_database("default_db") + table_obj_2 = db_obj_2.get_table("test_update") + res = table_obj_2.output(["*"]).to_df() + pd.testing.assert_frame_equal(res, pd.DataFrame( + {'c1': (2, 3, 4, 1), 'c2': (20, 30, 40, 90), 'c3': (200, 300, 400, 900)}) + .astype({'c1': dtype('int32'), 'c2': dtype('int32'), 'c3': dtype('int32')})) + + res = db_obj.drop_table("test_update", ConflictType.Error) + assert res.error_code == ErrorCode.OK + + except Exception as e: + print(e) + cluster.clear() + raise + else: + cluster.clear() + + def test_update_on_follower(self, cluster: InfinityCluster): + try: + cluster.add_node("node1", "conf/leader.toml") + cluster.add_node("node2", "conf/follower.toml") + + print("init nodes") + cluster.set_leader("node1") + cluster.set_follower("node2") + time.sleep(1) + + infinity1 = cluster.client("node1") + infinity2 = cluster.client("node2") + + db_obj = infinity1.get_database("default_db") + + db_obj.drop_table(table_name="test_update", conflict_type=ConflictType.Ignore) + + table_obj = db_obj.create_table( + "test_update", {"c1": {"type": "int", "constraints": ["primary key", "not null"]}, + "c2": {"type": "int"}, "c3": {"type": "int"}}, ConflictType.Error) + assert table_obj is not None + + res = table_obj.insert( + [{"c1": 1, "c2": 10, "c3": 100}, {"c1": 2, "c2": 20, "c3": 200}, {"c1": 3, "c2": 30, "c3": 300}, + {"c1": 4, "c2": 40, "c3": 400}]) + assert res.error_code == ErrorCode.OK + + time.sleep(1) + db_obj_2 = infinity2.get_database("default_db") + table_obj_2 = db_obj_2.get_table("test_update") + try: + table_obj_2.update("c1 = 1", {"c2": 90, "c3": 900}) + except InfinityException as e: + print(e) + assert(e.error_code == 8007) + + res = db_obj.drop_table("test_update", ConflictType.Error) + assert res.error_code == ErrorCode.OK + + except Exception as e: + print(e) + cluster.clear() + raise + else: + cluster.clear() \ No newline at end of file diff --git a/scripts/build_tester_image.py b/scripts/build_tester_image.py index 3c0542128a..047059035a 100644 --- a/scripts/build_tester_image.py +++ b/scripts/build_tester_image.py @@ -14,9 +14,9 @@ def image_exists(image_name): def main(image_name: str, bin_path: str): - if image_exists(image_name): - print(f"Image {image_name} already exists.") - return + # if image_exists(image_name): + # print(f"Image {image_name} already exists.") + # return # download_command = "wget -O docker-27.3.1.tgz https://download.docker.com/linux/static/stable/x86_64/docker-27.3.1.tgz" # subprocess.check_call(download_command, shell=True) diff --git a/src/common/utility/exception.cpp b/src/common/utility/exception.cpp index ead7c42bdf..0df5f9d204 100644 --- a/src/common/utility/exception.cpp +++ b/src/common/utility/exception.cpp @@ -60,12 +60,12 @@ std::string_view GetErrorMsg(const String &message) { } void UnrecoverableError(const String &message, const char *file_name, u32 line) { - auto *storage = InfinityContext::instance().storage(); - if (storage != nullptr) { - CleanupInfoTracer *cleanup_tracer = storage->cleanup_info_tracer(); - String error_msg = cleanup_tracer->GetCleanupInfo(); - LOG_ERROR(std::move(error_msg)); - } + // auto *storage = InfinityContext::instance().storage(); + // if (storage != nullptr) { + // CleanupInfoTracer *cleanup_tracer = storage->cleanup_info_tracer(); + // String error_msg = cleanup_tracer->GetCleanupInfo(); + // LOG_ERROR(std::move(error_msg)); + // } if (IS_LOGGER_INITIALIZED()) { LOG_CRITICAL(message); } diff --git a/src/main/cluster_manager.cpp b/src/main/cluster_manager.cpp index 3f04965de2..cf37c151de 100644 --- a/src/main/cluster_manager.cpp +++ b/src/main/cluster_manager.cpp @@ -428,6 +428,7 @@ Status ClusterManager::AddNodeInfo(const SharedPtr &node_info) { auto iter = other_node_map_.find(node_info->node_name_); if (iter != other_node_map_.end()) { // Duplicated node + // TODO: Update node info and not throw error. return Status::DuplicateNode(node_info->node_name_); } } @@ -792,7 +793,7 @@ Status ClusterManager::ApplySyncedLogNolock(const Vector &synced_logs) { LOG_DEBUG(fmt::format("WAL Entry: {}", entry->ToString())); last_txn_id = entry->txn_id_; last_commit_ts = entry->commit_ts_; - wal_manager->ReplayWalEntry(*entry, false); + wal_manager->ReplayWalEntry(*entry, false, false); } LOG_INFO(fmt::format("Replicated from leader: latest txn commit_ts: {}, latest txn id: {}", last_commit_ts, last_txn_id)); @@ -822,7 +823,7 @@ Status ClusterManager::ContinueStartup(const Vector &synced_logs) { } } LOG_DEBUG(fmt::format("WAL Entry: {}", entry->ToString())); - wal_manager->ReplayWalEntry(*entry, true); + wal_manager->ReplayWalEntry(*entry, true, true); last_commit_ts = entry->commit_ts_; } diff --git a/src/storage/wal/wal_manager.cpp b/src/storage/wal/wal_manager.cpp index cd856b4f0f..88a51c996a 100644 --- a/src/storage/wal/wal_manager.cpp +++ b/src/storage/wal/wal_manager.cpp @@ -745,7 +745,7 @@ i64 WalManager::ReplayWalFile(StorageMode targe_storage_mode) { last_txn_id = replay_entries[replay_count]->txn_id_; LOG_DEBUG(replay_entries[replay_count]->ToString()); - ReplayWalEntry(*replay_entries[replay_count], false); + ReplayWalEntry(*replay_entries[replay_count], false, true); } LOG_INFO(fmt::format("Latest txn commit_ts: {}, latest txn id: {}", last_commit_ts, last_txn_id)); @@ -901,7 +901,7 @@ Vector> WalManager::CollectWalEntries() const { return wal_entries; } -void WalManager::ReplayWalEntry(const WalEntry &entry, bool on_startup) { +void WalManager::ReplayWalEntry(const WalEntry &entry, bool on_startup, bool is_replay) { for (const auto &cmd : entry.cmds_) { LOG_TRACE(fmt::format("Replay wal cmd: {}, commit ts: {}", WalCmd::WalCommandTypeToString(cmd->GetType()).c_str(), entry.commit_ts_)); switch (cmd->GetType()) { @@ -939,7 +939,7 @@ void WalManager::ReplayWalEntry(const WalEntry &entry, bool on_startup) { break; } case WalCommandType::APPEND: { - WalCmdAppendReplay(*dynamic_cast(cmd.get()), entry.txn_id_, entry.commit_ts_); + WalCmdAppendReplay(*dynamic_cast(cmd.get()), entry.txn_id_, entry.commit_ts_, is_replay); break; } case WalCommandType::DELETE: { @@ -1361,7 +1361,7 @@ void WalManager::WalCmdDropColumnsReplay(WalCmdDropColumns &cmd, TransactionID t commit_ts); } -void WalManager::WalCmdAppendReplay(const WalCmdAppend &cmd, TransactionID txn_id, TxnTimeStamp commit_ts) { +void WalManager::WalCmdAppendReplay(const WalCmdAppend &cmd, TransactionID txn_id, TxnTimeStamp commit_ts, bool is_replay) { auto [table_entry, table_status] = storage_->catalog()->GetTableByName(cmd.db_name_, cmd.table_name_, txn_id, commit_ts); if (!table_status.ok()) { String error_message = fmt::format("Wal Replay: Get table failed {}", table_status.message()); @@ -1375,7 +1375,7 @@ void WalManager::WalCmdAppendReplay(const WalCmdAppend &cmd, TransactionID txn_i auto append_state = MakeUnique(table_store->GetBlocks()); table_store->SetAppendState(std::move(append_state)); - Catalog::Append(table_store->GetTableEntry(), fake_txn->TxnID(), table_store, commit_ts, storage_->buffer_manager(), true); + Catalog::Append(table_store->GetTableEntry(), fake_txn->TxnID(), table_store, commit_ts, storage_->buffer_manager(), is_replay); Catalog::CommitWrite(table_store->GetTableEntry(), fake_txn->TxnID(), commit_ts, table_store->txn_segments(), nullptr); } diff --git a/src/storage/wal/wal_manager.cppm b/src/storage/wal/wal_manager.cppm index f98bd7e411..0e924617e2 100644 --- a/src/storage/wal/wal_manager.cppm +++ b/src/storage/wal/wal_manager.cppm @@ -100,7 +100,7 @@ public: Vector> CollectWalEntries() const; - void ReplayWalEntry(const WalEntry &entry, bool on_startup); + void ReplayWalEntry(const WalEntry &entry, bool on_startup, bool is_replay); TxnTimeStamp GetCheckpointedTS(); @@ -121,7 +121,7 @@ private: void WalCmdDropTableReplay(const WalCmdDropTable &cmd, TransactionID txn_id, TxnTimeStamp commit_ts); void WalCmdCreateIndexReplay(const WalCmdCreateIndex &cmd, TransactionID txn_id, TxnTimeStamp commit_ts); void WalCmdDropIndexReplay(const WalCmdDropIndex &cmd, TransactionID txn_id, TxnTimeStamp commit_ts); - void WalCmdAppendReplay(const WalCmdAppend &cmd, TransactionID txn_id, TxnTimeStamp commit_ts); + void WalCmdAppendReplay(const WalCmdAppend &cmd, TransactionID txn_id, TxnTimeStamp commit_ts, bool is_replay); // import and compact helper SharedPtr ReplaySegment(TableEntry *table_entry, const WalSegmentInfo &segment_info, TransactionID txn_id, TxnTimeStamp commit_ts);