From 941ea4bfa99e9bd71d9a4531b4f6d5cd57450035 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Thu, 12 Dec 2024 19:24:03 +0800 Subject: [PATCH] Refactor python SDK (#2364) ### What problem does this PR solve? Refactor python SDK code ### Type of change - [x] Refactoring - [x] Python SDK impacted, Need to update PyPI Signed-off-by: Jin Hai --- python/infinity_embedded/__init__.py | 8 +- python/infinity_embedded/common.py | 5 +- python/infinity_embedded/db.py | 3 +- python/infinity_embedded/errors.py | 35 +++++- python/infinity_embedded/index.py | 3 +- python/infinity_embedded/infinity.py | 4 +- .../local_infinity/client.py | 7 +- python/infinity_embedded/local_infinity/db.py | 14 +++ .../local_infinity/infinity.py | 15 ++- .../local_infinity/query_builder.py | 103 ++++++++++-------- .../infinity_embedded/local_infinity/table.py | 12 +- .../infinity_embedded/local_infinity/types.py | 7 +- .../infinity_embedded/local_infinity/utils.py | 5 +- python/infinity_embedded/table.py | 7 +- python/infinity_embedded/utils.py | 3 +- python/infinity_sdk/infinity/__init__.py | 3 +- python/infinity_sdk/infinity/common.py | 2 + .../infinity_sdk/infinity/connection_pool.py | 24 ++-- python/infinity_sdk/infinity/db.py | 1 + python/infinity_sdk/infinity/errors.py | 33 +++++- .../infinity_sdk/infinity/remote_thrift/db.py | 1 + .../infinity/remote_thrift/types.py | 18 ++- .../infinity/remote_thrift/utils.py | 1 - python/infinity_sdk/infinity/table.py | 3 +- python/infinity_sdk/infinity/utils.py | 1 + python/test_cluster/test_basic.py | 46 ++++---- python/test_cluster/test_insert.py | 4 +- 27 files changed, 250 insertions(+), 118 deletions(-) diff --git a/python/infinity_embedded/__init__.py b/python/infinity_embedded/__init__.py index 4b039c4926..764f6a567f 100644 --- a/python/infinity_embedded/__init__.py +++ b/python/infinity_embedded/__init__.py @@ -1,4 +1,4 @@ -# Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +# Copyright(C) 2024 InfiniFlow, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,12 +20,14 @@ # import pkg_resources # __version__ = pkg_resources.get_distribution("infinity_sdk").version -from infinity_embedded.common import URI, NetworkAddress, LOCAL_HOST, LOCAL_INFINITY_PATH, InfinityException, LOCAL_INFINITY_CONFIG_PATH +from infinity_embedded.common import URI, NetworkAddress, LOCAL_HOST, LOCAL_INFINITY_PATH, InfinityException, \ + LOCAL_INFINITY_CONFIG_PATH from infinity_embedded.infinity import InfinityConnection from infinity_embedded.local_infinity.infinity import LocalInfinityConnection from infinity_embedded.errors import ErrorCode -def connect(uri = LOCAL_INFINITY_PATH, config_path = LOCAL_INFINITY_CONFIG_PATH) -> InfinityConnection: + +def connect(uri=LOCAL_INFINITY_PATH, config_path=LOCAL_INFINITY_CONFIG_PATH) -> InfinityConnection: if isinstance(uri, str) and len(uri) != 0: return LocalInfinityConnection(uri, config_path) else: diff --git a/python/infinity_embedded/common.py b/python/infinity_embedded/common.py index d956f5c28d..a0d2e5fb87 100644 --- a/python/infinity_embedded/common.py +++ b/python/infinity_embedded/common.py @@ -1,4 +1,4 @@ -# Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +# Copyright(C) 2024 InfiniFlow, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from pathlib import Path from typing import Union from dataclasses import dataclass @@ -75,10 +76,12 @@ class ConflictType(object): Error = 1 Replace = 2 + class SortType(object): Asc = 0 Desc = 1 + class InfinityException(Exception): def __init__(self, error_code=0, error_message=None): self.error_code = error_code diff --git a/python/infinity_embedded/db.py b/python/infinity_embedded/db.py index 1e1693c890..930924397e 100644 --- a/python/infinity_embedded/db.py +++ b/python/infinity_embedded/db.py @@ -1,4 +1,4 @@ -# Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +# Copyright(C) 2024 InfiniFlow, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,6 +14,7 @@ from abc import ABC, abstractmethod + class Database(ABC): @abstractmethod diff --git a/python/infinity_embedded/errors.py b/python/infinity_embedded/errors.py index 1e482d3eb8..a40d15f381 100644 --- a/python/infinity_embedded/errors.py +++ b/python/infinity_embedded/errors.py @@ -1,4 +1,4 @@ -# Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +# Copyright(C) 2024 InfiniFlow, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -117,6 +117,15 @@ class ErrorCode(IntEnum): INVALID_EXPLAIN_TYPE = 3081, CHUNK_NOT_EXIST = 3082, NAME_MISMATCHED = 3083, + TRANSACTION_NOT_FOUND = 3084, + INVALID_DATABASE_INDEX = 3085, + INVALID_TABLE_INDEX = 3086, + FUNCTION_IS_DISABLE = 3087, + NOT_FOUND = 3088, + ERROR_INIT = 3089, + FILE_IS_OPEN = 3090, + UNKNOWN = 3091, + INVALID_QUERY_OPTION = 3092, TXN_ROLLBACK = 4001, TXN_CONFLICT = 4002, @@ -126,6 +135,7 @@ class ErrorCode(IntEnum): TOO_MANY_CONNECTIONS = 5003, CONFIGURATION_LIMIT_EXCEED = 5004, QUERY_IS_TOO_COMPLEX = 5005, + FAIL_TO_GET_SYS_INFO = 5006, QUERY_CANCELLED = 6001, QUERY_NOT_SUPPORTED = 6002, @@ -147,7 +157,26 @@ class ErrorCode(IntEnum): MUNMAP_FILE_ERROR = 7014, INVALID_FILE_FLAG = 7015, INVALID_SERVER_ADDRESS = 7016, + FAIL_TO_FUN_PYTHON = 7017, + CANT_CONNECT_SERVER = 7018, + NOT_EXIST_NODE = 7019, + DUPLICATE_NODE = 7020, + CANT_CONNECT_LEADER = 7021, + MINIO_INVALID_ACCESS_KEY = 7022, + MINIO_BUCKET_NOT_EXISTS = 7023, + INVALID_STORAGE_TYPE = 7024, + NOT_REGISTERED = 7025, + CANT_SWITCH_ROLE = 7026, + TOO_MANY_FOLLOWER = 7027, + TOO_MANY_LEARNER = 7028, INVALID_ENTRY = 8001, - NOT_FOUND_ENTRY = 8002, - EMPTY_ENTRY_LIST = 8003, + DUPLICATE_ENTRY = 8002 + NOT_FOUND_ENTRY = 8003, + EMPTY_ENTRY_LIST = 8004, + NO_WAL_ENTRY_FOUND = 8005, + WRONG_CHECKPOINT_TYPE = 8006, + INVALID_NODE_ROLE = 8007, + INVALID_NODE_STATUS = 8008, + NODE_INFO_UPDATED = 8009, + NODE_NAME_MISMATCH = 8010 diff --git a/python/infinity_embedded/index.py b/python/infinity_embedded/index.py index 1620ceb644..392185c275 100644 --- a/python/infinity_embedded/index.py +++ b/python/infinity_embedded/index.py @@ -1,4 +1,4 @@ -# Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +# Copyright(C) 2024 InfiniFlow, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ from infinity_embedded.embedded_infinity_ext import IndexType as LocalIndexType, WrapIndexInfo from infinity_embedded.embedded_infinity_ext import InitParameter as LocalInitParameter -from infinity_embedded.embedded_infinity_ext import WrapIndexInfo as LocalIndexInfo from infinity_embedded.errors import ErrorCode diff --git a/python/infinity_embedded/infinity.py b/python/infinity_embedded/infinity.py index 1251925629..8b40e08186 100644 --- a/python/infinity_embedded/infinity.py +++ b/python/infinity_embedded/infinity.py @@ -1,4 +1,4 @@ -# Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +# Copyright(C) 2024 InfiniFlow, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,8 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from abc import ABC, abstractmethod + # abstract class class InfinityConnection(ABC): def __init__(self, uri): diff --git a/python/infinity_embedded/local_infinity/client.py b/python/infinity_embedded/local_infinity/client.py index dfffc126c6..82f322ab27 100644 --- a/python/infinity_embedded/local_infinity/client.py +++ b/python/infinity_embedded/local_infinity/client.py @@ -1,4 +1,4 @@ -# Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +# Copyright(C) 2024 InfiniFlow, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -22,7 +22,8 @@ class LocalQueryResult: def __init__(self, error_code: PyErrorCode, error_msg: str, db_names=None, table_names=None, index_names=None, column_defs=None, column_fields=None, database_name=None, store_dir=None, table_count=None, comment=None, - table_name=None, index_name=None, index_type=None, index_comment=None, deleted_rows=0, extra_result=None): + table_name=None, index_name=None, index_type=None, index_comment=None, deleted_rows=0, + extra_result=None): self.error_code = error_code self.error_msg = error_msg self.db_names = db_names @@ -44,7 +45,7 @@ def __init__(self, error_code: PyErrorCode, error_msg: str, db_names=None, table class LocalInfinityClient: - def __init__(self, path: str = LOCAL_INFINITY_PATH, config_path = LOCAL_INFINITY_CONFIG_PATH): + def __init__(self, path: str = LOCAL_INFINITY_PATH, config_path=LOCAL_INFINITY_CONFIG_PATH): self.path = path Infinity.LocalInit(path, config_path) self.client = Infinity.LocalConnect() diff --git a/python/infinity_embedded/local_infinity/db.py b/python/infinity_embedded/local_infinity/db.py index d2835e3b11..81e0308120 100644 --- a/python/infinity_embedded/local_infinity/db.py +++ b/python/infinity_embedded/local_infinity/db.py @@ -1,3 +1,17 @@ +# Copyright(C) 2024 InfiniFlow, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from abc import ABC from infinity_embedded.db import Database diff --git a/python/infinity_embedded/local_infinity/infinity.py b/python/infinity_embedded/local_infinity/infinity.py index 669a007579..9201def38f 100644 --- a/python/infinity_embedded/local_infinity/infinity.py +++ b/python/infinity_embedded/local_infinity/infinity.py @@ -1,3 +1,17 @@ +# Copyright(C) 2024 InfiniFlow, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import os from infinity_embedded import InfinityConnection from abc import ABC @@ -90,7 +104,6 @@ def show_current_node(self): else: raise InfinityException(res.error_code, res.error_msg) - def search(self, db_name, table_name): self.check_connect() res = self._client.search(db_name, table_name, []) diff --git a/python/infinity_embedded/local_infinity/query_builder.py b/python/infinity_embedded/local_infinity/query_builder.py index 4238820470..3e1967e232 100644 --- a/python/infinity_embedded/local_infinity/query_builder.py +++ b/python/infinity_embedded/local_infinity/query_builder.py @@ -1,3 +1,17 @@ +# Copyright(C) 2024 InfiniFlow, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from __future__ import annotations from abc import ABC @@ -18,18 +32,19 @@ from infinity_embedded.table import ExplainType as BaseExplainType from infinity_embedded.errors import ErrorCode + class Query(ABC): def __init__( - self, - columns: Optional[List[WrapParsedExpr]], - highlight: Optional[List[WrapParsedExpr]], - search: Optional[WrapSearchExpr], - filter: Optional[WrapParsedExpr], - group_by: Optional[List[WrapParsedExpr]], - limit: Optional[WrapParsedExpr], - offset: Optional[WrapParsedExpr], - sort: Optional[List[WrapOrderByExpr]], - total_hits_count: Optional[bool] + self, + columns: Optional[List[WrapParsedExpr]], + highlight: Optional[List[WrapParsedExpr]], + search: Optional[WrapSearchExpr], + filter: Optional[WrapParsedExpr], + group_by: Optional[List[WrapParsedExpr]], + limit: Optional[WrapParsedExpr], + offset: Optional[WrapParsedExpr], + sort: Optional[List[WrapOrderByExpr]], + total_hits_count: Optional[bool] ): self.columns = columns self.highlight = highlight @@ -44,16 +59,16 @@ def __init__( class ExplainQuery(Query): def __init__( - self, - columns: Optional[List[WrapParsedExpr]], - highlight: Optional[List[WrapParsedExpr]], - search: Optional[WrapSearchExpr], - filter: Optional[WrapParsedExpr], - group_by: Optional[List[WrapParsedExpr]], - limit: Optional[WrapParsedExpr], - offset: Optional[WrapParsedExpr], - sort: Optional[List[WrapOrderByExpr]], - explain_type: Optional[BaseExplainType], + self, + columns: Optional[List[WrapParsedExpr]], + highlight: Optional[List[WrapParsedExpr]], + search: Optional[WrapSearchExpr], + filter: Optional[WrapParsedExpr], + group_by: Optional[List[WrapParsedExpr]], + limit: Optional[WrapParsedExpr], + offset: Optional[WrapParsedExpr], + sort: Optional[List[WrapOrderByExpr]], + explain_type: Optional[BaseExplainType], ): super().__init__(columns, highlight, search, filter, group_by, limit, offset, sort, None) self.explain_type = explain_type @@ -84,13 +99,13 @@ def reset(self): self._total_hits_count = None def match_dense( - self, - vector_column_name: str, - embedding_data: VEC, - embedding_data_type: str, - distance_type: str, - topn: int, - knn_params: {} = None, + self, + vector_column_name: str, + embedding_data: VEC, + embedding_data_type: str, + distance_type: str, + topn: int, + knn_params: {} = None, ) -> InfinityLocalQueryBuilder: if self._search is None: self._search = WrapSearchExpr() @@ -108,7 +123,8 @@ def match_dense( if embedding_data_type == "bit": if len(embedding_data) % 8 != 0: raise InfinityException( - ErrorCode.INVALID_EMBEDDING_DATA_TYPE, f"Embeddings with data bit must have dimension of times of 8!" + ErrorCode.INVALID_EMBEDDING_DATA_TYPE, + f"Embeddings with data bit must have dimension of times of 8!" ) else: new_embedding_data = [] @@ -174,7 +190,8 @@ def match_dense( elem_type = EmbeddingDataType.kElemBFloat16 data.bf16_array_value = embedding_data else: - raise InfinityException(ErrorCode.INVALID_EMBEDDING_DATA_TYPE, f"Invalid embedding {embedding_data[0]} type") + raise InfinityException(ErrorCode.INVALID_EMBEDDING_DATA_TYPE, + f"Invalid embedding {embedding_data[0]} type") dist_type = KnnDistanceType.kInvalid if distance_type == "l2": @@ -218,12 +235,12 @@ def match_dense( return self def match_sparse( - self, - vector_column_name: str, - sparse_data: SparseVector | dict, - metric_type: str, - topn: int, - opt_params: {} = None, + self, + vector_column_name: str, + sparse_data: SparseVector | dict, + metric_type: str, + topn: int, + opt_params: {} = None, ) -> InfinityLocalQueryBuilder: if self._search is None: self._search = WrapSearchExpr() @@ -298,7 +315,7 @@ def match_sparse( return self def match_text( - self, fields: str, matching_text: str, topn: int, extra_options: Optional[dict] + self, fields: str, matching_text: str, topn: int, extra_options: Optional[dict] ) -> InfinityLocalQueryBuilder: if self._search is None: self._search = WrapSearchExpr() @@ -324,12 +341,12 @@ def match_text( return self def match_tensor( - self, - column_name: str, - query_data: VEC, - query_data_type: str, - topn: int, - extra_option: Optional[dict] = None, + self, + column_name: str, + query_data: VEC, + query_data_type: str, + topn: int, + extra_option: Optional[dict] = None, ) -> InfinityLocalQueryBuilder: if self._search is None: self._search = WrapSearchExpr() @@ -674,7 +691,7 @@ def to_result(self) -> tuple[dict[str, list[Any]], dict[str, Any], {}]: limit=self._limit, offset=self._offset, sort=self._sort, - total_hits_count = self._total_hits_count, + total_hits_count=self._total_hits_count, ) self.reset() return self._table._execute_query(query) diff --git a/python/infinity_embedded/local_infinity/table.py b/python/infinity_embedded/local_infinity/table.py index a5c85e3543..7b0137de3e 100644 --- a/python/infinity_embedded/local_infinity/table.py +++ b/python/infinity_embedded/local_infinity/table.py @@ -1,4 +1,4 @@ -# Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +# Copyright(C) 2024 InfiniFlow, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import functools import inspect from typing import Optional, Union, List, Any @@ -59,7 +60,7 @@ def wrapper(*args, **kwargs): @name_validity_check("index_name", "Index") def create_index(self, index_name: str, index_info: IndexInfo, - conflict_type: ConflictType = ConflictType.Error, index_comment : str = ""): + conflict_type: ConflictType = ConflictType.Error, index_comment: str = ""): index_name = index_name.strip() create_index_conflict: LocalConflictType @@ -166,8 +167,8 @@ def insert(self, data: Union[INSERT_DATA, list[INSERT_DATA]]): constant_expression = get_local_constant_expr_from_python_value(value) parse_exprs.append(constant_expression) insert_row = WrapInsertRowExpr() - insert_row.columns=column_names - insert_row.values=parse_exprs + insert_row.columns = column_names + insert_row.values = parse_exprs fields.append(insert_row) res = self._conn.insert(db_name=db_name, table_name=table_name, fields=fields) @@ -384,7 +385,8 @@ def sort(self, order_by_expr_list: Optional[List[list[str, SortType]]]): raise InfinityException(ErrorCode.INVALID_PARAMETER_VALUE, "order_by_expr_list must be a list of [column_name, sort_type]") if order_by_expr[1] not in [SortType.Asc, SortType.Desc]: - raise InfinityException(ErrorCode.INVALID_PARAMETER_VALUE, "sort_type must be SortType.Asc or SortType.Desc") + raise InfinityException(ErrorCode.INVALID_PARAMETER_VALUE, + "sort_type must be SortType.Asc or SortType.Desc") self.query_builder.sort(order_by_expr_list) return self diff --git a/python/infinity_embedded/local_infinity/types.py b/python/infinity_embedded/local_infinity/types.py index 8c92cf9e8a..80f3a8f22a 100644 --- a/python/infinity_embedded/local_infinity/types.py +++ b/python/infinity_embedded/local_infinity/types.py @@ -1,4 +1,4 @@ -# Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +# Copyright(C) 2024 InfiniFlow, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,11 +15,10 @@ import struct import json from collections import defaultdict -from typing import Any, Tuple, Dict, List -import polars as pl +from typing import Any import numpy as np from numpy import dtype -from infinity_embedded.common import VEC, SparseVector, InfinityException, DEFAULT_MATCH_VECTOR_TOPN +from infinity_embedded.common import VEC, SparseVector, InfinityException from infinity_embedded.embedded_infinity_ext import * from infinity_embedded.errors import ErrorCode from datetime import date, time, datetime, timedelta diff --git a/python/infinity_embedded/local_infinity/utils.py b/python/infinity_embedded/local_infinity/utils.py index af48336f68..c5a55bf4b5 100644 --- a/python/infinity_embedded/local_infinity/utils.py +++ b/python/infinity_embedded/local_infinity/utils.py @@ -1,4 +1,4 @@ -# Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +# Copyright(C) 2024 InfiniFlow, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -25,11 +25,10 @@ from infinity_embedded.common import InfinityException, SparseVector from infinity_embedded.local_infinity.types import build_result, logic_type_to_dtype from infinity_embedded.utils import binary_exp_to_paser_exp -from infinity_embedded.embedded_infinity_ext import WrapInExpr, WrapParsedExpr, WrapOrderByExpr, WrapFunctionExpr, \ +from infinity_embedded.embedded_infinity_ext import WrapInExpr, WrapParsedExpr, WrapFunctionExpr, \ WrapColumnExpr, WrapConstantExpr, ParsedExprType, LiteralType from infinity_embedded.embedded_infinity_ext import WrapEmbeddingType, WrapColumnDef, WrapDataType, LogicalType, \ EmbeddingDataType, WrapSparseType, ConstraintType -from datetime import date, time, datetime, timedelta def traverse_conditions(cons, fn=None): diff --git a/python/infinity_embedded/table.py b/python/infinity_embedded/table.py index 164d9b72da..2f7b78bc54 100644 --- a/python/infinity_embedded/table.py +++ b/python/infinity_embedded/table.py @@ -1,4 +1,4 @@ -# Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +# Copyright(C) 2024 InfiniFlow, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,15 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import ABC, abstractmethod from enum import Enum -from typing import Optional, Union, Any -from infinity_embedded.index import IndexInfo from infinity_embedded.common import InfinityException, INSERT_DATA from infinity_embedded.embedded_infinity_ext import ExplainType as LocalExplainType from infinity_embedded.errors import ErrorCode + class ExplainType(Enum): Analyze = 1 Ast = 2 @@ -30,7 +28,6 @@ class ExplainType(Enum): Pipeline = 6 Fragment = 7 - def to_local_ttype(self): if self is ExplainType.Ast: return LocalExplainType.kAst diff --git a/python/infinity_embedded/utils.py b/python/infinity_embedded/utils.py index 8f3c2d7bb5..6857fded8e 100644 --- a/python/infinity_embedded/utils.py +++ b/python/infinity_embedded/utils.py @@ -1,4 +1,4 @@ -# Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +# Copyright(C) 2024 InfiniFlow, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -47,5 +47,6 @@ def binary_exp_to_paser_exp(binary_expr_key) -> str: else: raise InfinityException(ErrorCode.INVALID_EXPRESSION, f"unknown binary expression: {binary_expr_key}") + def deprecated_api(message): warnings.warn(message, DeprecationWarning, stacklevel=2) diff --git a/python/infinity_sdk/infinity/__init__.py b/python/infinity_sdk/infinity/__init__.py index 95bfba7981..4dff794bcc 100644 --- a/python/infinity_sdk/infinity/__init__.py +++ b/python/infinity_sdk/infinity/__init__.py @@ -26,7 +26,8 @@ from infinity.remote_thrift.infinity import RemoteThriftInfinityConnection from infinity.errors import ErrorCode -def connect(uri = LOCAL_HOST, logger: logging.Logger = None) -> InfinityConnection: + +def connect(uri=LOCAL_HOST, logger: logging.Logger = None) -> InfinityConnection: if isinstance(uri, NetworkAddress): return RemoteThriftInfinityConnection(uri, logger) else: diff --git a/python/infinity_sdk/infinity/common.py b/python/infinity_sdk/infinity/common.py index 2300818b9c..09b4fdeaa0 100644 --- a/python/infinity_sdk/infinity/common.py +++ b/python/infinity_sdk/infinity/common.py @@ -74,10 +74,12 @@ class ConflictType(object): Error = 1 Replace = 2 + class SortType(object): Asc = 0 Desc = 1 + class InfinityException(Exception): def __init__(self, error_code=0, error_message=None): self.error_code = error_code diff --git a/python/infinity_sdk/infinity/connection_pool.py b/python/infinity_sdk/infinity/connection_pool.py index 6f4893e07e..e2f74490ad 100644 --- a/python/infinity_sdk/infinity/connection_pool.py +++ b/python/infinity_sdk/infinity/connection_pool.py @@ -1,3 +1,17 @@ +# Copyright(C) 2024 InfiniFlow, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from threading import Lock import infinity from infinity.common import NetworkAddress @@ -5,7 +19,7 @@ class ConnectionPool(object): - def __init__(self, uri = NetworkAddress("127.0.0.1", 23817), max_size=16): + def __init__(self, uri=NetworkAddress("127.0.0.1", 23817), max_size=16): self.uri_ = uri self.max_size_ = max_size self.free_pool_ = [] @@ -13,7 +27,6 @@ def __init__(self, uri = NetworkAddress("127.0.0.1", 23817), max_size=16): for i in range(max_size): self._create_conn() - def _del__(self): self.destroy() @@ -21,7 +34,6 @@ def _create_conn(self): infinity_coon = infinity.connect(self.uri_) self.free_pool_.append(infinity_coon) - def get_conn(self): with self.lock_: if (len(self.free_pool_) == 0): @@ -30,20 +42,18 @@ def get_conn(self): logging.debug("get_conn") return conn - def release_conn(self, conn): """ Note: User is allowed to release a connection not created by ConnectionPool, or not releasing(due to exception or some other reasons) a connection created by ConnectionPool. """ with self.lock_: - if(self.free_pool_.count(conn)): + if (self.free_pool_.count(conn)): raise Exception("the connection has been released") if (len(self.free_pool_) < self.max_size_): self.free_pool_.append(conn) logging.debug("release_conn") - def destroy(self): for conn in iter(self.free_pool_): conn.disconnect() - self.free_pool_.clear() \ No newline at end of file + self.free_pool_.clear() diff --git a/python/infinity_sdk/infinity/db.py b/python/infinity_sdk/infinity/db.py index 1e1693c890..635ee74581 100644 --- a/python/infinity_sdk/infinity/db.py +++ b/python/infinity_sdk/infinity/db.py @@ -14,6 +14,7 @@ from abc import ABC, abstractmethod + class Database(ABC): @abstractmethod diff --git a/python/infinity_sdk/infinity/errors.py b/python/infinity_sdk/infinity/errors.py index 1e482d3eb8..df959070f4 100644 --- a/python/infinity_sdk/infinity/errors.py +++ b/python/infinity_sdk/infinity/errors.py @@ -117,6 +117,15 @@ class ErrorCode(IntEnum): INVALID_EXPLAIN_TYPE = 3081, CHUNK_NOT_EXIST = 3082, NAME_MISMATCHED = 3083, + TRANSACTION_NOT_FOUND = 3084, + INVALID_DATABASE_INDEX = 3085, + INVALID_TABLE_INDEX = 3086, + FUNCTION_IS_DISABLE = 3087, + NOT_FOUND = 3088, + ERROR_INIT = 3089, + FILE_IS_OPEN = 3090, + UNKNOWN = 3091, + INVALID_QUERY_OPTION = 3092, TXN_ROLLBACK = 4001, TXN_CONFLICT = 4002, @@ -126,6 +135,7 @@ class ErrorCode(IntEnum): TOO_MANY_CONNECTIONS = 5003, CONFIGURATION_LIMIT_EXCEED = 5004, QUERY_IS_TOO_COMPLEX = 5005, + FAIL_TO_GET_SYS_INFO = 5006, QUERY_CANCELLED = 6001, QUERY_NOT_SUPPORTED = 6002, @@ -147,7 +157,26 @@ class ErrorCode(IntEnum): MUNMAP_FILE_ERROR = 7014, INVALID_FILE_FLAG = 7015, INVALID_SERVER_ADDRESS = 7016, + FAIL_TO_FUN_PYTHON = 7017, + CANT_CONNECT_SERVER = 7018, + NOT_EXIST_NODE = 7019, + DUPLICATE_NODE = 7020, + CANT_CONNECT_LEADER = 7021, + MINIO_INVALID_ACCESS_KEY = 7022, + MINIO_BUCKET_NOT_EXISTS = 7023, + INVALID_STORAGE_TYPE = 7024, + NOT_REGISTERED = 7025, + CANT_SWITCH_ROLE = 7026, + TOO_MANY_FOLLOWER = 7027, + TOO_MANY_LEARNER = 7028, INVALID_ENTRY = 8001, - NOT_FOUND_ENTRY = 8002, - EMPTY_ENTRY_LIST = 8003, + DUPLICATE_ENTRY = 8002 + NOT_FOUND_ENTRY = 8003, + EMPTY_ENTRY_LIST = 8004, + NO_WAL_ENTRY_FOUND = 8005, + WRONG_CHECKPOINT_TYPE = 8006, + INVALID_NODE_ROLE = 8007, + INVALID_NODE_STATUS = 8008, + NODE_INFO_UPDATED = 8009, + NODE_NAME_MISMATCH = 8010 \ No newline at end of file diff --git a/python/infinity_sdk/infinity/remote_thrift/db.py b/python/infinity_sdk/infinity/remote_thrift/db.py index 60e2b8553e..e5da17858e 100644 --- a/python/infinity_sdk/infinity/remote_thrift/db.py +++ b/python/infinity_sdk/infinity/remote_thrift/db.py @@ -28,6 +28,7 @@ from infinity.common import ConflictType from infinity.common import InfinityException + class RemoteDatabase(Database, ABC): def __init__(self, conn, name: str): self._conn = conn diff --git a/python/infinity_sdk/infinity/remote_thrift/types.py b/python/infinity_sdk/infinity/remote_thrift/types.py index 95dcdafa31..08f7b866a5 100644 --- a/python/infinity_sdk/infinity/remote_thrift/types.py +++ b/python/infinity_sdk/infinity/remote_thrift/types.py @@ -18,7 +18,7 @@ from infinity.common import VEC, SparseVector, InfinityException from infinity.remote_thrift.infinity_thrift_rpc.ttypes import * from collections import defaultdict -from typing import Any, Tuple, Dict, List, Optional +from typing import Any, Optional from datetime import date, time, datetime, timedelta import polars as pl @@ -174,14 +174,16 @@ def column_vector_to_list(column_type: ttypes.ColumnType, column_data_type: ttyp case _: raise NotImplementedError(f"Unsupported type {column_type}") + def parse_date_bytes(column_vector): parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector)) date_list = [] epoch = date(1970, 1, 1) - for value in parsed_list: - date_list.append((epoch + timedelta(days = value)).strftime('%Y-%m-%d')) + for value in parsed_list: + date_list.append((epoch + timedelta(days=value)).strftime('%Y-%m-%d')) return date_list + def parse_time_bytes(column_vector): parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector)) time_list = [] @@ -192,15 +194,18 @@ def parse_time_bytes(column_vector): time_list.append(time(hour=hours, minute=minutes, second=seconds).strftime('%H:%M:%S')) return time_list + def parse_datetime_bytes(column_vector): parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector)) datetime_list = [] epoch = datetime(1970, 1, 1) for i in range(0, len(parsed_list), 2): if i + 1 < len(parsed_list): - datetime_list.append((epoch + timedelta(days = parsed_list[i], seconds = parsed_list[i + 1])).strftime('%Y-%m-%d %H:%M:%S')); + datetime_list.append( + (epoch + timedelta(days=parsed_list[i], seconds=parsed_list[i + 1])).strftime('%Y-%m-%d %H:%M:%S')); return datetime_list + def parse_interval_bytes(column_vector): parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector)) interval_list = [] @@ -208,6 +213,7 @@ def parse_interval_bytes(column_vector): interval_list.append(str(timedelta(seconds=value).total_seconds()) + 's') return interval_list + def parse_bytes(bytes_data): results = [] offset = 0 @@ -299,6 +305,7 @@ def tensor_to_list(column_data_type: ttypes.DataType, binary_data) -> list[list[ raise NotImplementedError( f"Unsupported type {column_data_type.physical_type.embedding_type.element_type}") + def parse_sparse_bytes(column_data_type: ttypes.DataType, column_vector): dimension = column_data_type.physical_type.sparse_type.dimension element_type = column_data_type.physical_type.sparse_type.element_type @@ -491,6 +498,7 @@ def make_match_sparse_expr(vector_column_name: str, sparse_data: SparseVector | for k, v in opt_params.items(): match_sparse_options.append(InitParameter(param_name=k, param_value=v)) - match_sparse_expr = MatchSparseExpr(column_expr=column_expr, query_sparse_expr=query_sparse_expr, metric_type=metric_type, + match_sparse_expr = MatchSparseExpr(column_expr=column_expr, query_sparse_expr=query_sparse_expr, + metric_type=metric_type, topn=topn, opt_params=match_sparse_options, filter_expr=filter_expr) return match_sparse_expr diff --git a/python/infinity_sdk/infinity/remote_thrift/utils.py b/python/infinity_sdk/infinity/remote_thrift/utils.py index 4dfa23080d..5e7bed11ff 100644 --- a/python/infinity_sdk/infinity/remote_thrift/utils.py +++ b/python/infinity_sdk/infinity/remote_thrift/utils.py @@ -26,7 +26,6 @@ from infinity.utils import binary_exp_to_paser_exp from infinity.common import InfinityException, SparseVector from infinity.errors import ErrorCode -from datetime import date, time, datetime, timedelta def traverse_conditions(cons, fn=None) -> ttypes.ParsedExpr: diff --git a/python/infinity_sdk/infinity/table.py b/python/infinity_sdk/infinity/table.py index 10393991ce..a2205ad629 100644 --- a/python/infinity_sdk/infinity/table.py +++ b/python/infinity_sdk/infinity/table.py @@ -21,6 +21,7 @@ from infinity.common import InfinityException, INSERT_DATA from infinity.errors import ErrorCode + class ExplainType(Enum): Analyze = 1 Ast = 2 @@ -46,4 +47,4 @@ def to_ttype(self): elif self is ExplainType.Fragment: return ttypes.ExplainType.Fragment else: - raise InfinityException(ErrorCode.INVALID_EXPLAIN_TYPE, "Unknown explain type") \ No newline at end of file + raise InfinityException(ErrorCode.INVALID_EXPLAIN_TYPE, "Unknown explain type") diff --git a/python/infinity_sdk/infinity/utils.py b/python/infinity_sdk/infinity/utils.py index 2514dfe445..64d7b82101 100644 --- a/python/infinity_sdk/infinity/utils.py +++ b/python/infinity_sdk/infinity/utils.py @@ -47,5 +47,6 @@ def binary_exp_to_paser_exp(binary_expr_key) -> str: else: raise InfinityException(ErrorCode.INVALID_EXPRESSION, f"unknown binary expression: {binary_expr_key}") + def deprecated_api(message): warnings.warn(message, DeprecationWarning, stacklevel=2) diff --git a/python/test_cluster/test_basic.py b/python/test_cluster/test_basic.py index 90898397a4..3955d9aba2 100644 --- a/python/test_cluster/test_basic.py +++ b/python/test_cluster/test_basic.py @@ -42,29 +42,29 @@ def test_0(cluster: InfinityCluster): cluster.remove_node("node1") -def test_mock(mock_cluster: MockInfinityCluster): - cluster = mock_cluster - with cluster: - cluster.add_node("node1", "conf/leader.toml") - cluster.add_node("node2", "conf/follower.toml") - - cluster.set_leader("node1") - cluster.set_follower("node2") - - time.sleep(1) - - cluster.disconnect("node2") - time.sleep(0.1) - cluster.reconnect("node2") - - cluster.block_peer_net("node2") - time.sleep(0.1) - cluster.restore_peer_net("node2") - - time.sleep(1) - - cluster.remove_node("node2") - cluster.remove_node("node1") +# def test_mock(mock_cluster: MockInfinityCluster): +# cluster = mock_cluster +# with cluster: +# cluster.add_node("node1", "conf/leader.toml") +# cluster.add_node("node2", "conf/follower.toml") +# +# cluster.set_leader("node1") +# cluster.set_follower("node2") +# +# time.sleep(1) +# +# cluster.disconnect("node2") +# time.sleep(0.1) +# cluster.reconnect("node2") +# +# cluster.block_peer_net("node2") +# time.sleep(0.1) +# cluster.restore_peer_net("node2") +# +# time.sleep(1) +# +# cluster.remove_node("node2") +# cluster.remove_node("node1") @pytest.mark.docker diff --git a/python/test_cluster/test_insert.py b/python/test_cluster/test_insert.py index 42a811ffeb..1ad7559806 100644 --- a/python/test_cluster/test_insert.py +++ b/python/test_cluster/test_insert.py @@ -60,8 +60,8 @@ def __test_inner_1(self, cluster: InfinityCluster): def test_insert_11(self, cluster: InfinityCluster): self.__test_inner_1(cluster) - def test_insert_12(self, mock_cluster: MockInfinityCluster): - self.__test_inner_1(mock_cluster) + # def test_insert_12(self, mock_cluster: MockInfinityCluster): + # self.__test_inner_1(mock_cluster) @pytest.mark.docker def test_insert_13(self, docker_cluster: DockerInfinityCluster):