diff --git a/docs/references/configurations.mdx b/docs/references/configurations.mdx index 143472d01d..38be8dfc3c 100644 --- a/docs/references/configurations.mdx +++ b/docs/references/configurations.mdx @@ -153,6 +153,18 @@ mem_index_capacity = 1048576 # Range: {"local"|"minio"} storage_type = "local" +# The number of dense vector index building worker threads. Defaults to the half number of CPU cores. +# Range: [1, number of CPU cores] +dense_index_building_worker = 2 + +# The number of sparse vector index building worker threads. Defaults to the half number of CPU cores. +# Range: [1, number of CPU cores] +sparse_index_building_worker = 2 + +# The number of fulltext index building worker threads. Defaults to the half number of CPU cores. +# Range: [1, number of CPU cores] +fulltext_index_building_worker = 2 + # Object storage configuration [storage.object_storage] # URL of the object storage server diff --git a/docs/references/http_api_reference.mdx b/docs/references/http_api_reference.mdx index 915ae9dc17..1bdb6484e0 100644 --- a/docs/references/http_api_reference.mdx +++ b/docs/references/http_api_reference.mdx @@ -1775,6 +1775,10 @@ Searches for data in a specified table. The search can range from a simple vecto - `"highlight"`: `string[]` - `"filter"`: `string` - `"fusion"`: `object` + - `"sort"` : `object[]` + - `"limit"` : `string` + - `"offset"` : `string` + - `"option"` : `object` ##### Request example @@ -2006,7 +2010,17 @@ curl --request GET \ - `"query_tensor"`: The tensor data to compare against. This should be provided as a list of lists of numerical values. - `"element_type"`: The element data type of the query tensor. Usually `"float"`. +- `"sort"` : `object[]` + Defines how to sort the results. +- `"limit"` : `string` + Indicates the limit row count. + +- `"offset"` : `string` + Indicates the offset position of the limit expression. You must use this parameter together with `limit`. + +- `"option"` : `object` + Indicates some search options. This parameter must be used in conjunction with `limit`. #### Response @@ -2029,11 +2043,14 @@ The response includes a JSON object like the following: "age": 16 } ] + "total_hits_count": 3 } ``` - `"error_code"`: `integer` `0`: The operation succeeds. +- `"total_hits_count"`: `integer`, Optional + Available if you set a search option with `"total_hits_count": "true"` diff --git a/docs/references/pysdk_api_reference.md b/docs/references/pysdk_api_reference.md index 44dcdbdfbb..5c09eabefb 100644 --- a/docs/references/pysdk_api_reference.md +++ b/docs/references/pysdk_api_reference.md @@ -1788,13 +1788,134 @@ table_object.output(["*"]).filter("filter_fulltext('doc', 'first second', 'minim --- +### sort + +```python +table_object.sort(sort_expression_list) +``` + +Creates a sort expression using `sort_expression_list`. + +#### Parameters + +##### sort_expression_list: `list`, *Required* + +An expression list defining how to sort the results. + +#### Returns + +- Success: An `infinity.local_infinity.table.LocalTable` object in embedded mode or an `infinity.remote_thrift.table.RemoteTable` object in client-server mode. +- Failure: `InfinityException` + - `error_code`: `int` A non-zero value indicating a specific error condition. + - `error_msg`: `str` A message providing additional details about the error. + +#### Examples + +```python +# Output results sorted by the `c2` expression in ascending order and the `c1` expression in descending order +table_obj.output(["c1", "c2"]).sort([["c2", SortType.Asc], ["c1", SortType.Desc]]).to_df() +``` + +--- + +### limit + +```python +table_object.limit(limit_num) +``` + +Creates an expression to limit the number of the output rows to a maximum of `limit_num`. + +#### Parameters + +##### limit_num: `int`, *Required* + +An integer specifying the maximum number of output rows. + +#### Returns + +- Success: An `infinity.local_infinity.table.LocalTable` object in embedded mode or an `infinity.remote_thrift.table.RemoteTable` object in client-server mode. +- Failure: `InfinityException` + - `error_code`: `int` A non-zero value indicating a specific error condition. + - `error_msg`: `str` A message providing additional details about the error. + +#### Examples + +```python +# Limit the output row count to a maximum of two +table_instance.output(["num", "vec"]).limit(2).to_pl() +``` + +--- + +### offset + +```python +table_object.limit(limit_num).offset(offset_value) +``` + +Creates a limit expression with an offset value, setting the output to start from `offset_value` and limiting the row count to a maximum of `limit_num`. This method must be used in conjunction with `limit()`. + +#### Parameters + +##### offset_value: `int`, *Required* + +An integer specifying the offset position of the limit expression. + +#### Returns + +- Success: An `infinity.local_infinity.table.LocalTable` object in embedded mode or an `infinity.remote_thrift.table.RemoteTable` object in client-server mode. +- Failure: `InfinityException` + - `error_code`: `int` A non-zero value indicating a specific error condition. + - `error_msg`: `str` A message providing additional details about the error. + +#### Examples + +```python +# Limit the output row count not more than 2, start from position 1 +table_instance.output(["num", "vec"]).offset(1).limit(2).to_pl() +``` + +### option + +```python +table_object.option(option_dict) +``` + +Indicates some search options. + +#### Parameters + +##### option_dict: `dict`, *Required* + +A dictionary specifying the following search options: + +- **"total_hits_count"**: `bool`, *Optional* + - Must combine with limit expression. If `"total_hits_count"` is `True`, the query will output an extra result including total hits row count of the query. + +#### Returns + +- Success: An `infinity.local_infinity.table.LocalTable` object in embedded mode or an `infinity.remote_thrift.table.RemoteTable` object in client-server mode. +- Failure: `InfinityException` + - `error_code`: `int` A non-zero value indicating a specific error condition. + - `error_msg`: `str` A message providing additional details about the error. + +#### Examples + +```python +# Limit the output row count not more than 2, start from position 1, output an extra result to indicate total hits row count +table_instance.output(["num", "vec"]).limit(2).offset(1).option({"total_hits_count": True}).to_pl() +``` + +--- + ### match_dense ```python table_object.match_dense(vector_column_name, embedding_data, embedding_data_type, distance_type, topn, knn_params = None) ``` -Creates a dense vector search expression to identify the top n closest rows to the given dense vector. Suitable for working with dense vectors (dense embeddings) or multi-vectors (multiple dense embeddings in one row). +Creates a dense vector search expression to identify the closest top n rows to the given dense vector. Suitable for working with dense vectors (dense embeddings) or multi-vectors (multiple dense embeddings in one row). :::tip NOTE To display your query results, you must chain this method with `output(columns)`, which specifies the columns to output, and a method such as `to_pl()`, `to_df()`, or `to_arrow()` to format the query results. @@ -2285,7 +2406,7 @@ We recommend calling `to_df()`, `to_pl()`, or `to_arrow()` to format your result #### Returns -`tuple[dict[str, list[Any]], dict[str, Any]]` +A `tuple[dict[str, list[Any]], dict[str, Any]], {}` object ### to_df @@ -2293,7 +2414,7 @@ We recommend calling `to_df()`, `to_pl()`, or `to_arrow()` to format your result table_object.to_df() ``` -Returns the query result in pandas DataFrame format. +Returns the query result as a tuple consisting of a pandas DataFrame and a dict. :::tip NOTE Call `to_df()` in a chain after (not necessarily "immediately after") `output(columns)` on the same table object. @@ -2301,13 +2422,13 @@ Call `to_df()` in a chain after (not necessarily "immediately after") `output(co #### Returns -A `pandas.DataFrame` object. +A `tuple[pandas.DataFrame, {}]` object #### Examples ```python # Format columns "c1" and C2" of the current table into a pandas DataFrame -res = table_object.output(["c1", "c2"]).to_df() +res, extra_res = table_object.output(["c1", "c2"]).to_df() ``` ### to_pl @@ -2316,7 +2437,7 @@ res = table_object.output(["c1", "c2"]).to_df() table_object.to_pl() ``` -Returns the query result in Polas DataFrame format. +Returns the query result as a tuple consisting of a Polars DataFrame and a dict. :::tip NOTE Call `to_pl()` in a chain after (not necessarily "immediately after") `output(columns)` on the same table object. @@ -2324,13 +2445,13 @@ Call `to_pl()` in a chain after (not necessarily "immediately after") `output(co #### Returns -A `polas.DataFrame` object. +A `tuple[polas.DataFrame, {}]` object. #### Examples ```python -# Format a vector search result into a Polas DataFrame. -res = table_object.output(["*"]).match_dense("vec", [3.0, 2.8, 2.7, 3.1], "float", "ip", 10).to_pl() +# Format a vector search result into a Polars DataFrame. +res, extra_res = table_object.output(["*"]).match_dense("vec", [3.0, 2.8, 2.7, 3.1], "float", "ip", 10).to_pl() ``` ### to_arrow @@ -2339,7 +2460,7 @@ res = table_object.output(["*"]).match_dense("vec", [3.0, 2.8, 2.7, 3.1], "float table_object.to_arrow() ``` -Returns the query result in Apache Arrow Table format. +Returns the query result as a tuple consisting of an Apache Arrow Table and a dict. :::tip NOTE Call `to_arrow()` in a chain after (not necessarily "immediately after") `output(columns)` on the same table object. @@ -2347,13 +2468,13 @@ Call `to_arrow()` in a chain after (not necessarily "immediately after") `output #### Returns -A `pyarrow.Table` object. +A `tuple[pyarrow.Table, {}]` object. #### Examples ```python # Format the current table object into an Apache Arrow Table. -res = table_object.output(["*"]).filter("score >= 90").to_arrow() +res, extra_result = table_object.output(["*"]).filter("score >= 90").to_arrow() ``` --- diff --git a/example/delete_update_data.py b/example/delete_update_data.py index 2f1ac7ac9f..398c6edfe3 100644 --- a/example/delete_update_data.py +++ b/example/delete_update_data.py @@ -87,8 +87,10 @@ print('about to update data') table_instance.update("num = 2", {"body": "unnecessary and harmful", "vec": [14.0, 7.2, 0.8, 10.9]}) - result = table_instance.output(["*"]).to_pl() + result, extra_result = table_instance.output(["*"]).to_pl() print(result) + if extra_result is not None: + print(extra_result) infinity_instance.disconnect() print('test done') diff --git a/example/export_data.py b/example/export_data.py index d39ce8a5b8..d3d655487e 100644 --- a/example/export_data.py +++ b/example/export_data.py @@ -86,7 +86,7 @@ }, { "num": 7, - "body": "Chris", + "name": "Chris", "age": 21, "score": 88.0, }, diff --git a/example/filter_data.py b/example/filter_data.py index abb067511d..a13eecfeab 100644 --- a/example/filter_data.py +++ b/example/filter_data.py @@ -72,7 +72,7 @@ }, { "num": 7, - "body": "Chris", + "name": "Chris", "score": 88.0, }, { @@ -99,8 +99,10 @@ # result = table_instance.output(["num", "name", "score"]).filter("not (score > 80.0)").to_pl() # print(result) - result = table_instance.output(["num", "name", "score"]).filter("num <> 9").to_pl() + result, extra_result = table_instance.output(["num", "name", "score"]).filter("num <> 9").to_pl() print(result) + if extra_result is not None: + print(extra_result) infinity_instance.disconnect() print('test done') diff --git a/example/filter_fulltext_keyword.py b/example/filter_fulltext_keyword.py index 5809da1333..daf6e691a9 100644 --- a/example/filter_fulltext_keyword.py +++ b/example/filter_fulltext_keyword.py @@ -101,16 +101,22 @@ ) # output 7, 8, 9, 10 - result = table_instance.output(["*"]).filter("(score > 80.0) and (score <= 90.0)").to_pl() + result, extra_result = table_instance.output(["*"]).filter("(score > 80.0) and (score <= 90.0)").to_pl() print(result) + if extra_result is not None: + print(extra_result) # output 6, 8 - result = table_instance.output(["*"]).filter("filter_fulltext('uuid', 'UUID-2-1 UUID-2-3')").to_pl() + result, extra_result = table_instance.output(["*"]).filter("filter_fulltext('uuid', 'UUID-2-1 UUID-2-3')").to_pl() print(result) + if extra_result is not None: + print(extra_result) # output 8 - result = table_instance.output(["*"]).filter("(score > 80.0) and (score <= 90.0) and filter_fulltext('uuid', 'UUID-2-1 UUID-2-3')").to_pl() + result, extra_result = table_instance.output(["*"]).filter("(score > 80.0) and (score <= 90.0) and filter_fulltext('uuid', 'UUID-2-1 UUID-2-3')").to_pl() print(result) + if extra_result is not None: + print(extra_result) # drop table db_instance.drop_table("my_table") diff --git a/example/fulltext_search.py b/example/fulltext_search.py index f3e6102187..75ad5c962d 100644 --- a/example/fulltext_search.py +++ b/example/fulltext_search.py @@ -86,13 +86,15 @@ r'"harmful chemical"~10', # sloppy phrase, refers to https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-match-query-phrase.html ] for question in questions: - qb_result = ( + qb_result, extra_result = ( table_instance.output(["num", "body", "_score"]).highlight(["body"]) .match_text("body", question, 10) .to_pl() ) print(f"question: {question}") print(qb_result) + if extra_result is not None: + print(extra_result) infinity_instance.disconnect() diff --git a/example/fulltext_search_zh.py b/example/fulltext_search_zh.py index 58f7deaf22..d658058c77 100644 --- a/example/fulltext_search_zh.py +++ b/example/fulltext_search_zh.py @@ -112,9 +112,11 @@ r'"Bloom filter"', # phrase: adjacent multiple terms ] for question in questions: - qb_result = table_instance.output(["num", "body", "_score"]).highlight(["body"]).match_text("body", question, 10).to_pl() + qb_result, extra_result = table_instance.output(["num", "body", "_score"]).highlight(["body"]).match_text("body", question, 10).to_pl() print(f"question: {question}") print(qb_result) + if extra_result is not None: + print(extra_result) infinity_instance.disconnect() diff --git a/example/functions.py b/example/functions.py index 440216155b..a7821643b2 100644 --- a/example/functions.py +++ b/example/functions.py @@ -26,55 +26,85 @@ # varchar functions #function char_length -res = table_obj.output(["*", "char_length(c1)"]).filter("char_length(c1) = 1").to_df() +res, extra_result = table_obj.output(["*", "char_length(c1)"]).filter("char_length(c1) = 1").to_df() print(res) +if extra_result is not None: + print(extra_result) -res = table_obj.output(["*", "char_length(c1)"]).filter("char_length(c1) = 3").to_df() +res, extra_result = table_obj.output(["*", "char_length(c1)"]).filter("char_length(c1) = 3").to_df() print(res) +if extra_result is not None: + print(extra_result) -res = table_obj.output(["*", "char_length(c1)"]).filter("char_length(c1) = 4").to_df() +res, extra_result = table_obj.output(["*", "char_length(c1)"]).filter("char_length(c1) = 4").to_df() print(res) +if extra_result is not None: + print(extra_result) -res = table_obj.output(["*", "char_length(c1)"]).filter("char_length(c1) = char_length(c2)").to_df() +res, extra_result = table_obj.output(["*", "char_length(c1)"]).filter("char_length(c1) = char_length(c2)").to_df() print(res) +if extra_result is not None: + print(extra_result) #function regex -res = table_obj.output(["*", "regex(c1, 'bc')"]).filter("regex(c1, 'bc')").to_df() +res, extra_result = table_obj.output(["*", "regex(c1, 'bc')"]).filter("regex(c1, 'bc')").to_df() print(res) +if extra_result is not None: + print(extra_result) -res = table_obj.output(["*"]).filter("regex(c1, '(\w+([-+.]\w+)*)@(\w+([-.]\w+)*)\.(\w+([-.]\w+)*)')").to_df() +res, extra_result = table_obj.output(["*"]).filter("regex(c1, '(\w+([-+.]\w+)*)@(\w+([-.]\w+)*)\.(\w+([-.]\w+)*)')").to_df() print(res) +if extra_result is not None: + print(extra_result) #function substring -res = table_obj.output(["*", "substring(c1, 0, 2)"]).filter("substring(c1, 0, 2) = 'ab'").to_df() +res, extra_result = table_obj.output(["*", "substring(c1, 0, 2)"]).filter("substring(c1, 0, 2) = 'ab'").to_df() print(res) +if extra_result is not None: + print(extra_result) -res = table_obj.output(["*", "substring(c1, 0, 4)"]).filter("substring(c1, 0, 4) = 'test'").to_df() +res, extra_result = table_obj.output(["*", "substring(c1, 0, 4)"]).filter("substring(c1, 0, 4) = 'test'").to_df() print(res) +if extra_result is not None: + print(extra_result) #function upper and lower -res = table_obj.output(["*", "upper(c1)"]).filter("upper(c1) = 'TEST@GMAIL.COM'").to_df() +res, extra_result = table_obj.output(["*", "upper(c1)"]).filter("upper(c1) = 'TEST@GMAIL.COM'").to_df() print(res) +if extra_result is not None: + print(extra_result) -res = table_obj.output(["*"]).filter("lower('ABC') = c1").to_df() +res, extra_result = table_obj.output(["*"]).filter("lower('ABC') = c1").to_df() print(res) +if extra_result is not None: + print(extra_result) #function ltrim, rtrim, trim -res = table_obj.output(["*", "ltrim(c1)"]).filter("ltrim(c1) = 'abc'").to_df() +res, extra_result = table_obj.output(["*", "ltrim(c1)"]).filter("ltrim(c1) = 'abc'").to_df() print(res) +if extra_result is not None: + print(extra_result) -res = table_obj.output(["*", "rtrim(c1)"]).filter("rtrim(c1) = 'abc'").to_df() +res, extra_result = table_obj.output(["*", "rtrim(c1)"]).filter("rtrim(c1) = 'abc'").to_df() print(res) +if extra_result is not None: + print(extra_result) -res = table_obj.output(["*", "trim(c1)"]).filter("trim(c1) = 'abc'").to_df() +res, extra_result = table_obj.output(["*", "trim(c1)"]).filter("trim(c1) = 'abc'").to_df() print(res) +if extra_result is not None: + print(extra_result) -res = table_obj.output(["*"]).filter("trim(' abc ') = rtrim(ltrim(' abc '))").to_df() +res, extra_result = table_obj.output(["*"]).filter("trim(' abc ') = rtrim(ltrim(' abc '))").to_df() print(res) +if extra_result is not None: + print(extra_result) #function char_position -res = table_obj.output(["*", "char_position(c1, 'bc')"]).filter("char_position(c1, c1) <> 0").to_df() +res, extra_result = table_obj.output(["*", "char_position(c1, 'bc')"]).filter("char_position(c1, c1) <> 0").to_df() print(res) +if extra_result is not None: + print(extra_result) # math functions db_obj.drop_table("function_example", ConflictType.Ignore) @@ -87,27 +117,39 @@ {"c1": 9, "c2": 10}, {"c1": 11, "c2": 12}, {"c1": 13, "c2": 14}, {"c1": 15, "c2": 16},]) #function sqrt -res = table_obj.output(["*", "sqrt(c1)", "sqrt(c2)"]).to_df() +res, extra_result = table_obj.output(["*", "sqrt(c1)", "sqrt(c2)"]).to_df() print(res) +if extra_result is not None: + print(extra_result) -res = table_obj.output(["*", "sqrt(c1)", "sqrt(c2)"]).filter("sqrt(c1) = 3").to_df() +res, extra_result = table_obj.output(["*", "sqrt(c1)", "sqrt(c2)"]).filter("sqrt(c1) = 3").to_df() print(res) +if extra_result is not None: + print(extra_result) #function round -res = table_obj.output(["*", "round(c1)", "round(c2)"]).to_df() +res, extra_result = table_obj.output(["*", "round(c1)", "round(c2)"]).to_df() print(res) +if extra_result is not None: + print(extra_result) #function ceiling -res = table_obj.output(["*", "ceil(c1)", "ceil(c2)"]).to_df() +res, extra_result = table_obj.output(["*", "ceil(c1)", "ceil(c2)"]).to_df() print(res) +if extra_result is not None: + print(extra_result) #function floor -res = table_obj.output(["*", "floor(c1)", "floor(c2)"]).to_df() +res, extra_result = table_obj.output(["*", "floor(c1)", "floor(c2)"]).to_df() print(res) +if extra_result is not None: + print(extra_result) #function ln -res = table_obj.output(["*", "ln(c1)", "ln(c2)"]).to_df() +res, extra_result = table_obj.output(["*", "ln(c1)", "ln(c2)"]).to_df() print(res) +if extra_result is not None: + print(extra_result) res = db_obj.drop_table("function_example") diff --git a/example/http/insert_search_data.sh b/example/http/insert_search_data.sh index 289b2ff74e..0a5d4af466 100755 --- a/example/http/insert_search_data.sh +++ b/example/http/insert_search_data.sh @@ -144,7 +144,11 @@ curl --request GET \ ], "filter": "num > 1 and year < 2024", "offset": "1", - "limit": "1" + "limit": "1", + "option": + { + "total_hits_count": "true" + } } ' echo -e '\n\n-- search with dense vector' @@ -171,11 +175,7 @@ curl --request GET \ "metric_type": "cosine", "topn": 4 } - ], - "option": - { - "total_hits_count": "true" - } + ] } ' echo -e '\n\n-- search with sparse vector' diff --git a/example/hybrid_search.py b/example/hybrid_search.py index bb5cbd6e38..6692d84cc6 100644 --- a/example/hybrid_search.py +++ b/example/hybrid_search.py @@ -90,7 +90,7 @@ infinity.common.ConflictType.Error, ) - result = ( + result, extra_result = ( table_instance.output( ["num", "body", "vec", "sparse", "year", "tensor", "score()"] ) @@ -108,6 +108,8 @@ .to_pl() # .explain(explain_type=infinity.table.ExplainType.UnOpt) ) + if extra_result is not None: + print(extra_result) print(result) infinity_instance.disconnect() diff --git a/example/import_data.py b/example/import_data.py index 38027df883..a355fdba3e 100644 --- a/example/import_data.py +++ b/example/import_data.py @@ -48,8 +48,10 @@ table_instance.import_data(project_directory + "/../test/data/csv/fulltext_delete.csv", {"header": True, "file_type": "csv", "delimiter": "\t"}) - result = table_instance.output(["num", "doc"]).to_pl() + result, extra_result = table_instance.output(["num", "doc"]).to_pl() print(result) + if extra_result is not None: + print(extra_result) infinity_instance.disconnect() diff --git a/example/secondary_index.py b/example/secondary_index.py index 48d1eefbaa..3c4a7aac5a 100644 --- a/example/secondary_index.py +++ b/example/secondary_index.py @@ -55,8 +55,10 @@ ) table_instance.create_index("index1", infinity.index.IndexInfo("id", infinity.index.IndexType.Secondary)) - res = table_instance.filter("id='ID_1'").output(["*"]).to_pl() + res, extra_result = table_instance.filter("id='ID_1'").output(["*"]).to_pl() print(res) + if extra_result is not None: + print(extra_result) infinity_instance.disconnect() diff --git a/example/simple_example.py b/example/simple_example.py index d4117f200f..73799d4c5c 100644 --- a/example/simple_example.py +++ b/example/simple_example.py @@ -61,8 +61,10 @@ ] ) - res = table_instance.output(["num", "body", "vec"]).to_pl() + res, extra_result = table_instance.output(["num", "body", "vec"]).to_pl() print(res) + if extra_result is not None: + print(extra_result) infinity_instance.disconnect() diff --git a/example/sparse_vector_search.py b/example/sparse_vector_search.py index 0a66bdc1dc..979a841502 100644 --- a/example/sparse_vector_search.py +++ b/example/sparse_vector_search.py @@ -61,8 +61,11 @@ ] ) - result = table_instance.output(["num", "vec", "_similarity"]).match_sparse("vec", infinity.common.SparseVector([0, 20, 80], [1.0, 2.0, 3.0]), "ip", 3).to_pl() + result, extra_result = table_instance.output(["num", "vec", "_similarity"]).match_sparse("vec", infinity.common.SparseVector([0, 20, 80], [1.0, 2.0, 3.0]), "ip", 3).to_pl() print(result) + if extra_result is not None: + print(extra_result) + infinity_instance.disconnect() print('test done') diff --git a/example/tensor_search.py b/example/tensor_search.py index f9822adcc6..59072f41fc 100644 --- a/example/tensor_search.py +++ b/example/tensor_search.py @@ -62,10 +62,14 @@ }, ] ) - result = table_instance.output(["num", "vec", "_score"]).match_tensor("vec", - [[0.9, 0.0, 0.0, 0.0], [1.1, 0.0, 0.0, 0.0]], - 'float', 2).to_pl() + result, extra_result = table_instance.output(["num", "vec", "_score"]).match_tensor("vec", + [[0.9, 0.0, 0.0, 0.0], + [1.1, 0.0, 0.0, 0.0]], + 'float', 2).to_pl() print(result) + if extra_result is not None: + print(extra_result) + infinity_instance.disconnect() print('test done') diff --git a/example/vector_search.py b/example/vector_search.py index 8fdb75f98d..ab377359f5 100644 --- a/example/vector_search.py +++ b/example/vector_search.py @@ -70,6 +70,7 @@ print(result) if extra_result is not None: print(extra_result) + infinity_instance.disconnect() print('test done') diff --git a/python/infinity_embedded/local_infinity/query_builder.py b/python/infinity_embedded/local_infinity/query_builder.py index 292c3133cb..4238820470 100644 --- a/python/infinity_embedded/local_infinity/query_builder.py +++ b/python/infinity_embedded/local_infinity/query_builder.py @@ -664,7 +664,7 @@ def sort(self, order_by_expr_list: Optional[List[list[str, SortType]]]) -> Infin self._sort = sort_list return self - def to_result(self) -> tuple[dict[str, list[Any]], dict[str, Any], Any]: + def to_result(self) -> tuple[dict[str, list[Any]], dict[str, Any], {}]: query = Query( columns=self._columns, highlight=self._highlight, @@ -679,7 +679,7 @@ def to_result(self) -> tuple[dict[str, list[Any]], dict[str, Any], Any]: self.reset() return self._table._execute_query(query) - def to_df(self) -> (pd.DataFrame, Any): + def to_df(self) -> (pd.DataFrame, {}): df_dict = {} data_dict, data_type_dict, extra_result = self.to_result() for k, v in data_dict.items(): @@ -687,11 +687,11 @@ def to_df(self) -> (pd.DataFrame, Any): df_dict[k] = data_series return pd.DataFrame(df_dict), extra_result - def to_pl(self) -> (pl.DataFrame, Any): + def to_pl(self) -> (pl.DataFrame, {}): dataframe, extra_result = self.to_df() return pl.from_pandas(dataframe), extra_result - def to_arrow(self) -> (Table, Any): + def to_arrow(self) -> (Table, {}): dataframe, extra_result = self.to_df() return pa.Table.from_pandas(dataframe), extra_result diff --git a/python/infinity_http.py b/python/infinity_http.py index 59e3eef48f..e2f7be7d9a 100644 --- a/python/infinity_http.py +++ b/python/infinity_http.py @@ -3,6 +3,7 @@ import requests import logging +import json from test_pysdk.common.common_data import * from infinity.common import ConflictType, InfinityException, SparseVector, SortType from typing import Optional, Any @@ -711,6 +712,9 @@ def __init__(self, output: list, table_http: table_http): self._match_sparse = [] self._search_exprs = [] self._sort = [] + self._limit = None + self._offset = None + self._option = None def select(self): url = f"databases/{self.table_http.database_name}/tables/{self.table_http.table_name}/docs" @@ -726,6 +730,12 @@ def select(self): tmp["highlight"] = self._highlight if len(self._sort): tmp["sort"] = self._sort + if self._limit is not None: + tmp["limit"] = str(self._limit) + if self._offset is not None: + tmp["offset"] = str(self._offset) + if self._option is not None: + tmp["option"] = self._option # print(tmp) d = self.table_http.net.set_up_data([], tmp) r = self.table_http.net.request(url, "get", h, d) @@ -764,6 +774,13 @@ def explain(self, ExplainType=ExplainType.Physical): tmp["output"] = self._output if len(self._highlight): tmp["highlight"] = self._highlight + if self._limit is not None: + tmp["limit"] = self._limit + if self._offset is not None: + tmp["offset"] = self._offset + if self._option is not None: + tmp["option"] = self._option + tmp["explain_type"] = ExplainType_transfrom(ExplainType) # print(tmp) d = self.table_http.net.set_up_data([], tmp) @@ -803,6 +820,23 @@ def sort(self, order_by_expr_list: Optional[List[list[str, SortType]]]): self._sort.append(tmp) return self + def limit(self, limit_num): + self._limit = limit_num + return self + + def offset(self, offset): + self._offset = offset + return self + + def option(self, option: {}): + # option_str = json.dumps(option) + # option_str = str(option) + # option_str.replace("'\"'", "") + # eval(option_str) + # option_str.replace("'", "") + self._option = option + return self + def match_text(self, fields: str, query: str, topn: int, opt_params: Optional[dict] = None): tmp_match_expr = {"match_method": "text", "fields": fields, "matching_text": query, "topn": topn} if opt_params is not None: diff --git a/python/infinity_sdk/infinity/remote_thrift/query_builder.py b/python/infinity_sdk/infinity/remote_thrift/query_builder.py index 5b4f80916b..c6ac98289d 100644 --- a/python/infinity_sdk/infinity/remote_thrift/query_builder.py +++ b/python/infinity_sdk/infinity/remote_thrift/query_builder.py @@ -506,7 +506,7 @@ def sort(self, order_by_expr_list: Optional[List[list[str, SortType]]]) -> Infin self._sort = sort_list return self - def to_result(self) -> tuple[dict[str, list[Any]], dict[str, Any], Any]: + def to_result(self) -> tuple[dict[str, list[Any]], dict[str, Any], {}]: query = Query( columns=self._columns, highlight=self._highlight, @@ -521,7 +521,7 @@ def to_result(self) -> tuple[dict[str, list[Any]], dict[str, Any], Any]: self.reset() return self._table._execute_query(query) - def to_df(self) -> (pd.DataFrame, Any): + def to_df(self) -> (pd.DataFrame, {}): df_dict = {} data_dict, data_type_dict, extra_result = self.to_result() for k, v in data_dict.items(): @@ -529,11 +529,11 @@ def to_df(self) -> (pd.DataFrame, Any): df_dict[k] = data_series return pd.DataFrame(df_dict), extra_result - def to_pl(self) -> (pl.DataFrame, Any): + def to_pl(self) -> (pl.DataFrame, {}): dataframe, extra_result = self.to_df() return pl.from_pandas(dataframe), extra_result - def to_arrow(self) -> (Table, Any): + def to_arrow(self) -> (Table, {}): dataframe, extra_result = self.to_df() return pa.Table.from_pandas(dataframe), extra_result diff --git a/python/test_pysdk/test_limit.py b/python/test_pysdk/test_limit.py new file mode 100644 index 0000000000..77cdc249e7 --- /dev/null +++ b/python/test_pysdk/test_limit.py @@ -0,0 +1,106 @@ +import importlib +import sys +import os +import os +import pandas as pd +import pytest +from common import common_values +import infinity +import infinity.index as index +import infinity_embedded +from numpy import dtype +from infinity.errors import ErrorCode +from infinity.common import ConflictType, SortType + +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(current_dir) +if parent_dir not in sys.path: + sys.path.insert(0, parent_dir) +from infinity_http import infinity_http +from common.utils import copy_data +from datetime import date, time, datetime + + +@pytest.fixture(scope="class") +def local_infinity(request): + return request.config.getoption("--local-infinity") + + +@pytest.fixture(scope="class") +def http(request): + return request.config.getoption("--http") + + +@pytest.fixture(scope="class") +def setup_class(request, local_infinity, http): + if local_infinity: + module = importlib.import_module("infinity_embedded.index") + globals()["index"] = module + module = importlib.import_module("infinity_embedded.common") + func = getattr(module, 'ConflictType') + globals()['ConflictType'] = func + func = getattr(module, 'InfinityException') + globals()['InfinityException'] = func + uri = common_values.TEST_LOCAL_PATH + request.cls.infinity_obj = infinity_embedded.connect(uri) + elif http: + uri = common_values.TEST_LOCAL_HOST + request.cls.infinity_obj = infinity_http() + else: + uri = common_values.TEST_LOCAL_HOST + request.cls.infinity_obj = infinity.connect(uri) + request.cls.uri = uri + yield + request.cls.infinity_obj.disconnect() + + +@pytest.mark.usefixtures("setup_class") +@pytest.mark.usefixtures("suffix") +class TestInfinity: + def test_limit(self, suffix): + db_obj = self.infinity_obj.get_database("default_db") + + # infinity + db_obj.drop_table("test_limit" + suffix, ConflictType.Ignore) + table_obj = db_obj.create_table( + "test_limit" + suffix, { + "c1": {"type": "int", "constraints": ["primary key", "not null"]}, + "c2": {"type": "int", "constraints": ["not null"]}}, ConflictType.Error) + + assert table_obj is not None + + res = table_obj.insert( + [{"c1": -3, "c2": 3}, {"c1": -2, "c2": 2}, {"c1": -1, "c2": 1}, {"c1": 0, "c2": 0}, {"c1": 1, "c2": 1}, + {"c1": 2, "c2": 2}, {"c1": 3, "c2": 3}]) + assert res.error_code == ErrorCode.OK + + res = table_obj.insert( + [{"c1": -8, "c2": 8}, {"c1": -7, "c2": 7}, {"c1": -6, "c2": 6}, {"c1": 7, "c2": 7}, {"c1": 8, "c2": 8}, + {"c1": 9, "c2": 9}]) + assert res.error_code == ErrorCode.OK + + res, extra_res = table_obj.output(["c1", "c2"]).limit(2).to_df() + pd.testing.assert_frame_equal(res, pd.DataFrame({'c1': (-3, -2), + 'c2': (3, 2)}) + .astype({'c1': dtype('int32'), 'c2': dtype('int32')})) + + res, extra_res = table_obj.output(["c1", "c2"]).limit(2).offset(2).to_df() + pd.testing.assert_frame_equal(res, pd.DataFrame({'c1': (-1, 0), + 'c2': (1, 0)}) + .astype({'c1': dtype('int32'), 'c2': dtype('int32')})) + + res, extra_res = table_obj.output(["c1", "c2"]).limit(2).offset(2).option({"total_hits_count": True}).to_df() + assert extra_res['total_hits_count'] == 13 + + pd.testing.assert_frame_equal(res, pd.DataFrame({'c1': (-1, 0), + 'c2': (1, 0)}) + .astype({'c1': dtype('int32'), 'c2': dtype('int32')})) + + res, extra_res = table_obj.output(["c1", "c2"]).sort([["c2", SortType.Asc], ["c1", SortType.Desc]]).limit(3).option({"total_hits_count": True}).to_df() + pd.testing.assert_frame_equal(res, pd.DataFrame({'c1': (0, 1, -1), + 'c2': (0, 1, 1)}) + .astype({'c1': dtype('int32'), 'c2': dtype('int32')})) + assert extra_res['total_hits_count'] == 13 + + res = db_obj.drop_table("test_limit" + suffix, ConflictType.Error) + assert res.error_code == ErrorCode.OK diff --git a/src/common/default_values.cppm b/src/common/default_values.cppm index 49cf3430ce..d962ab620f 100644 --- a/src/common/default_values.cppm +++ b/src/common/default_values.cppm @@ -278,6 +278,9 @@ export { constexpr std::string_view MEMINDEX_MEMORY_QUOTA_OPTION_NAME = "memindex_memory_quota"; constexpr std::string_view RESULT_CACHE_OPTION_NAME = "result_cache"; constexpr std::string_view CACHE_RESULT_CAPACITY_OPTION_NAME = "cache_result_capacity"; + constexpr std::string_view DENSE_INDEX_BUILDING_WORKER_OPTION_NAME = "dense_index_building_worker"; + constexpr std::string_view SPARSE_INDEX_BUILDING_WORKER_OPTION_NAME = "sparse_index_building_worker"; + constexpr std::string_view FULLTEXT_INDEX_BUILDING_WORKER_OPTION_NAME = "fulltext_index_building_worker"; constexpr std::string_view WAL_DIR_OPTION_NAME = "wal_dir"; constexpr std::string_view WAL_COMPACT_THRESHOLD_OPTION_NAME = "wal_compact_threshold"; diff --git a/src/executor/operator/physical_show.cpp b/src/executor/operator/physical_show.cpp index e41cc17a1b..1d65e4f884 100644 --- a/src/executor/operator/physical_show.cpp +++ b/src/executor/operator/physical_show.cpp @@ -3185,6 +3185,69 @@ void PhysicalShow::ExecuteShowConfigs(QueryContext *query_context, ShowOperatorS } } + { + { + // option name + Value value = Value::MakeVarchar(DENSE_INDEX_BUILDING_WORKER_OPTION_NAME); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[0]); + } + { + // option name type + Value value = Value::MakeVarchar(std::to_string(global_config->DenseIndexBuildingWorker())); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[1]); + } + { + // option name type + Value value = Value::MakeVarchar("Dense vector index building worker count"); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[2]); + } + } + + { + { + // option name + Value value = Value::MakeVarchar(SPARSE_INDEX_BUILDING_WORKER_OPTION_NAME); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[0]); + } + { + // option name type + Value value = Value::MakeVarchar(std::to_string(global_config->SparseIndexBuildingWorker())); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[1]); + } + { + // option name type + Value value = Value::MakeVarchar("Sparse vector index building worker count"); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[2]); + } + } + + { + { + // option name + Value value = Value::MakeVarchar(FULLTEXT_INDEX_BUILDING_WORKER_OPTION_NAME); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[0]); + } + { + // option name type + Value value = Value::MakeVarchar(std::to_string(global_config->FulltextIndexBuildingWorker())); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[1]); + } + { + // option name type + Value value = Value::MakeVarchar("Full-text index building worker count"); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[2]); + } + } + { { // option name diff --git a/src/main/config.cpp b/src/main/config.cpp index 8f0668ba78..dd389bcfdb 100644 --- a/src/main/config.cpp +++ b/src/main/config.cpp @@ -271,7 +271,8 @@ Status Config::Init(const SharedPtr &config_path, DefaultConfig *default // Peer connect timeout i64 peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; - UniquePtr peer_connect_timeout_option = MakeUnique(PEER_CONNECT_TIMEOUT_OPTION_NAME, peer_connect_timeout, 10000, 0); + UniquePtr peer_connect_timeout_option = + MakeUnique(PEER_CONNECT_TIMEOUT_OPTION_NAME, peer_connect_timeout, 10000, 0); status = global_options_.AddOption(std::move(peer_connect_timeout_option)); if (!status.ok()) { fmt::print("Fatal: {}", status.message()); @@ -482,6 +483,45 @@ Status Config::Init(const SharedPtr &config_path, DefaultConfig *default UnrecoverableError(status.message()); } + // Dense index building worker + i64 dense_index_building_worker = Thread::hardware_concurrency() / 2; + if (dense_index_building_worker < 2) { + dense_index_building_worker = 2; + } + UniquePtr dense_index_building_worker_option = + MakeUnique(DENSE_INDEX_BUILDING_WORKER_OPTION_NAME, dense_index_building_worker, Thread::hardware_concurrency(), 1); + status = global_options_.AddOption(std::move(dense_index_building_worker_option)); + if (!status.ok()) { + fmt::print("Fatal: {}", status.message()); + UnrecoverableError(status.message()); + } + + // Sparse index building worker + i64 sparse_index_building_worker = Thread::hardware_concurrency() / 2; + if (sparse_index_building_worker < 2) { + sparse_index_building_worker = 2; + } + UniquePtr sparse_index_building_worker_option = + MakeUnique(SPARSE_INDEX_BUILDING_WORKER_OPTION_NAME, sparse_index_building_worker, Thread::hardware_concurrency(), 1); + status = global_options_.AddOption(std::move(sparse_index_building_worker_option)); + if (!status.ok()) { + fmt::print("Fatal: {}", status.message()); + UnrecoverableError(status.message()); + } + + // Fulltext index building worker + i64 fulltext_index_building_worker = Thread::hardware_concurrency() / 2; + if (fulltext_index_building_worker < 2) { + fulltext_index_building_worker = 2; + } + UniquePtr fulltext_index_building_worker_option = + MakeUnique(FULLTEXT_INDEX_BUILDING_WORKER_OPTION_NAME, fulltext_index_building_worker, Thread::hardware_concurrency(), 1); + status = global_options_.AddOption(std::move(fulltext_index_building_worker_option)); + if (!status.ok()) { + fmt::print("Fatal: {}", status.message()); + UnrecoverableError(status.message()); + } + // Result Cache String result_cache(DEFAULT_RESULT_CACHE); auto result_cache_option = MakeUnique(RESULT_CACHE_OPTION_NAME, result_cache); @@ -492,7 +532,8 @@ Status Config::Init(const SharedPtr &config_path, DefaultConfig *default } i64 cache_result_num = DEFAULT_CACHE_RESULT_CAPACITY; - auto cache_result_num_option = MakeUnique(CACHE_RESULT_CAPACITY_OPTION_NAME, cache_result_num, std::numeric_limits::max(), 0); + auto cache_result_num_option = + MakeUnique(CACHE_RESULT_CAPACITY_OPTION_NAME, cache_result_num, std::numeric_limits::max(), 0); status = global_options_.AddOption(std::move(cache_result_num_option)); if (!status.ok()) { fmt::print("Fatal: {}", status.message()); @@ -1744,6 +1785,61 @@ Status Config::Init(const SharedPtr &config_path, DefaultConfig *default } break; } + + case GlobalOptionIndex::kDenseIndexBuildingWorker: { + i64 dense_index_building_worker = Thread::hardware_concurrency() / 2; + if (elem.second.is_integer()) { + dense_index_building_worker = elem.second.value_or(dense_index_building_worker); + } else { + return Status::InvalidConfig("'lru_num' field isn't integer."); + } + UniquePtr dense_index_building_worker_option = + MakeUnique(DENSE_INDEX_BUILDING_WORKER_OPTION_NAME, + dense_index_building_worker, + Thread::hardware_concurrency(), + 1); + if (!dense_index_building_worker_option->Validate()) { + return Status::InvalidConfig(fmt::format("Invalid dense vector index building number: {}", 0)); + } + global_options_.AddOption(std::move(dense_index_building_worker_option)); + break; + } + case GlobalOptionIndex::kSparseIndexBuildingWorker: { + i64 sparse_index_building_worker = Thread::hardware_concurrency() / 2; + if (elem.second.is_integer()) { + sparse_index_building_worker = elem.second.value_or(sparse_index_building_worker); + } else { + return Status::InvalidConfig("'lru_num' field isn't integer."); + } + UniquePtr sparse_index_building_worker_option = + MakeUnique(SPARSE_INDEX_BUILDING_WORKER_OPTION_NAME, + sparse_index_building_worker, + Thread::hardware_concurrency(), + 1); + if (!sparse_index_building_worker_option->Validate()) { + return Status::InvalidConfig(fmt::format("Invalid sparse vector index building number: {}", 0)); + } + global_options_.AddOption(std::move(sparse_index_building_worker_option)); + break; + } + case GlobalOptionIndex::kFulltextIndexBuildingWorker: { + i64 fulltext_index_building_worker = Thread::hardware_concurrency() / 2; + if (elem.second.is_integer()) { + fulltext_index_building_worker = elem.second.value_or(fulltext_index_building_worker); + } else { + return Status::InvalidConfig("'lru_num' field isn't integer."); + } + UniquePtr fulltext_index_building_worker_option = + MakeUnique(FULLTEXT_INDEX_BUILDING_WORKER_OPTION_NAME, + fulltext_index_building_worker, + Thread::hardware_concurrency(), + 1); + if (!fulltext_index_building_worker_option->Validate()) { + return Status::InvalidConfig(fmt::format("Invalid fulltext vector index building number: {}", 0)); + } + global_options_.AddOption(std::move(fulltext_index_building_worker_option)); + break; + } default: { return Status::InvalidConfig(fmt::format("Unrecognized config parameter: {} in 'storage' field", var_name)); } @@ -1830,6 +1926,52 @@ Status Config::Init(const SharedPtr &config_path, DefaultConfig *default } } + if (global_options_.GetOptionByIndex(GlobalOptionIndex::kDenseIndexBuildingWorker) == nullptr) { + // dense index building worker + i64 dense_index_building_worker = Thread::hardware_concurrency() / 2; + if (dense_index_building_worker < 2) { + dense_index_building_worker = 2; + } + UniquePtr dense_index_building_worker_option = MakeUnique(DENSE_INDEX_BUILDING_WORKER_OPTION_NAME, + dense_index_building_worker, + Thread::hardware_concurrency(), + 1); + Status status = global_options_.AddOption(std::move(dense_index_building_worker_option)); + if (!status.ok()) { + UnrecoverableError(status.message()); + } + } + if (global_options_.GetOptionByIndex(GlobalOptionIndex::kSparseIndexBuildingWorker) == nullptr) { + // sparse index building worker + i64 sparse_index_building_worker = Thread::hardware_concurrency() / 2; + if (sparse_index_building_worker < 2) { + sparse_index_building_worker = 2; + } + UniquePtr sparse_index_building_worker_option = MakeUnique(SPARSE_INDEX_BUILDING_WORKER_OPTION_NAME, + sparse_index_building_worker, + Thread::hardware_concurrency(), + 1); + Status status = global_options_.AddOption(std::move(sparse_index_building_worker_option)); + if (!status.ok()) { + UnrecoverableError(status.message()); + } + } + if (global_options_.GetOptionByIndex(GlobalOptionIndex::kMemIndexMemoryQuota) == nullptr) { + // fulltext index building worker + i64 fulltext_index_building_worker = Thread::hardware_concurrency() / 2; + if (fulltext_index_building_worker < 2) { + fulltext_index_building_worker = 2; + } + UniquePtr fulltext_index_building_worker_option = + MakeUnique(FULLTEXT_INDEX_BUILDING_WORKER_OPTION_NAME, + fulltext_index_building_worker, + Thread::hardware_concurrency(), + 1); + Status status = global_options_.AddOption(std::move(fulltext_index_building_worker_option)); + if (!status.ok()) { + UnrecoverableError(status.message()); + } + } } else { return Status::InvalidConfig("No 'storage' section in configure file."); } @@ -2543,6 +2685,21 @@ i64 Config::MemIndexCapacity() { return global_options_.GetIntegerValue(GlobalOptionIndex::kMemIndexCapacity); } +i64 Config::DenseIndexBuildingWorker() { + std::lock_guard guard(mutex_); + return global_options_.GetIntegerValue(GlobalOptionIndex::kDenseIndexBuildingWorker); +} + +i64 Config::SparseIndexBuildingWorker() { + std::lock_guard guard(mutex_); + return global_options_.GetIntegerValue(GlobalOptionIndex::kSparseIndexBuildingWorker); +} + +i64 Config::FulltextIndexBuildingWorker() { + std::lock_guard guard(mutex_); + return global_options_.GetIntegerValue(GlobalOptionIndex::kFulltextIndexBuildingWorker); +} + StorageType Config::StorageType() { std::lock_guard guard(mutex_); String storage_type_str = global_options_.GetStringValue(GlobalOptionIndex::kStorageType); @@ -2741,6 +2898,9 @@ void Config::PrintAll() { fmt::print(" - compact_interval: {}\n", Utility::FormatTimeInfo(CompactInterval())); fmt::print(" - optimize_index_interval: {}\n", Utility::FormatTimeInfo(OptimizeIndexInterval())); fmt::print(" - memindex_capacity: {}\n", MemIndexCapacity()); // mem index capacity is line number + fmt::print(" - dense_index_building_worker: {}\n", DenseIndexBuildingWorker()); + fmt::print(" - sparse_index_building_worker: {}\n", SparseIndexBuildingWorker()); + fmt::print(" - fulltext_index_building_worker: {}\n", FulltextIndexBuildingWorker()); fmt::print(" - storage_type: {}\n", ToString(StorageType())); switch (StorageType()) { case StorageType::kLocal: { diff --git a/src/main/config.cppm b/src/main/config.cppm index 66154f2541..4836235a69 100644 --- a/src/main/config.cppm +++ b/src/main/config.cppm @@ -102,6 +102,9 @@ public: void SetOptimizeInterval(i64); i64 MemIndexCapacity(); + i64 DenseIndexBuildingWorker(); + i64 SparseIndexBuildingWorker(); + i64 FulltextIndexBuildingWorker(); StorageType StorageType(); String ObjectStorageUrl(); diff --git a/src/main/infinity_context.cpp b/src/main/infinity_context.cpp index bc849b8c02..442c6b5243 100644 --- a/src/main/infinity_context.cpp +++ b/src/main/infinity_context.cpp @@ -239,8 +239,7 @@ Status InfinityContext::ChangeServerRole(NodeRole target_role, bool from_leader, } task_scheduler_ = MakeUnique(config_.get()); - i64 cpu_limit = config_->CPULimit(); - SetIndexThreadPool(cpu_limit); + SetIndexThreadPool(); break; } case NodeRole::kStandalone: { @@ -513,21 +512,18 @@ void InfinityContext::UnInit() { config_.reset(); } -void InfinityContext::SetIndexThreadPool(SizeT thread_num) { - thread_num = thread_num / 2; - if (thread_num < 2) - thread_num = 2; - LOG_TRACE(fmt::format("Set index thread pool size to {}", thread_num)); - inverting_thread_pool_.resize(thread_num); - commiting_thread_pool_.resize(thread_num); - hnsw_build_thread_pool_.resize(thread_num); +void InfinityContext::SetIndexThreadPool() { + LOG_TRACE("Set index thread pool."); + inverting_thread_pool_.resize(config_->DenseIndexBuildingWorker()); + commiting_thread_pool_.resize(config_->SparseIndexBuildingWorker()); + hnsw_build_thread_pool_.resize(config_->FulltextIndexBuildingWorker()); } void InfinityContext::RestoreIndexThreadPoolToDefault() { - LOG_TRACE("Restore index thread pool size to default"); - inverting_thread_pool_.resize(4); - commiting_thread_pool_.resize(2); - hnsw_build_thread_pool_.resize(4); + LOG_TRACE("Restore index thread pool size to default."); + inverting_thread_pool_.resize(config_->DenseIndexBuildingWorker()); + commiting_thread_pool_.resize(config_->SparseIndexBuildingWorker()); + hnsw_build_thread_pool_.resize(config_->FulltextIndexBuildingWorker()); } void InfinityContext::AddThriftServerFn(std::function start_func, std::function stop_func) { diff --git a/src/main/infinity_context.cppm b/src/main/infinity_context.cppm index 136cc6b69d..fd0e60722e 100644 --- a/src/main/infinity_context.cppm +++ b/src/main/infinity_context.cppm @@ -64,7 +64,7 @@ public: void UnInit(); - void SetIndexThreadPool(SizeT thread_num); + void SetIndexThreadPool(); void RestoreIndexThreadPoolToDefault(); void AddThriftServerFn(std::function start_func, std::function stop_func); @@ -90,11 +90,11 @@ private: atomic_bool infinity_context_inited_{false}; // For fulltext index - ThreadPool inverting_thread_pool_{4}; + ThreadPool inverting_thread_pool_{2}; ThreadPool commiting_thread_pool_{2}; // For hnsw index - ThreadPool hnsw_build_thread_pool_{4}; + ThreadPool hnsw_build_thread_pool_{2}; mutable std::mutex mutex_; diff --git a/src/main/options.cpp b/src/main/options.cpp index d5b06c838c..113dc59ae9 100644 --- a/src/main/options.cpp +++ b/src/main/options.cpp @@ -78,6 +78,10 @@ GlobalOptions::GlobalOptions() { name2index_[String(TEMP_DIR_OPTION_NAME)] = GlobalOptionIndex::kTempDir; name2index_[String(MEMINDEX_MEMORY_QUOTA_OPTION_NAME)] = GlobalOptionIndex::kMemIndexMemoryQuota; + name2index_[String(DENSE_INDEX_BUILDING_WORKER_OPTION_NAME)] = GlobalOptionIndex::kDenseIndexBuildingWorker; + name2index_[String(SPARSE_INDEX_BUILDING_WORKER_OPTION_NAME)] = GlobalOptionIndex::kSparseIndexBuildingWorker; + name2index_[String(FULLTEXT_INDEX_BUILDING_WORKER_OPTION_NAME)] = GlobalOptionIndex::kFulltextIndexBuildingWorker; + name2index_[String(RESULT_CACHE_OPTION_NAME)] = GlobalOptionIndex::kResultCache; name2index_[String(CACHE_RESULT_CAPACITY_OPTION_NAME)] = GlobalOptionIndex::kCacheResultCapacity; diff --git a/src/main/options.cppm b/src/main/options.cppm index 33b78f3a64..2ac15c9f45 100644 --- a/src/main/options.cppm +++ b/src/main/options.cppm @@ -165,8 +165,10 @@ export enum class GlobalOptionIndex : i8 { kPeerConnectTimeout = 49, kPeerRecvTimeout = 50, kPeerSendTimeout = 51, - - kInvalid = 52, + kDenseIndexBuildingWorker = 53, + kSparseIndexBuildingWorker = 54, + kFulltextIndexBuildingWorker = 55, + kInvalid = 57, }; export struct GlobalOptions { diff --git a/src/network/http/http_search.cpp b/src/network/http/http_search.cpp index e914594501..8733cd3e2b 100644 --- a/src/network/http/http_search.cpp +++ b/src/network/http/http_search.cpp @@ -202,17 +202,26 @@ void HTTPSearch::Process(Infinity *infinity_ptr, String key = option.key(); ToLower(key); if (key == "total_hits_count") { - String value = option.value(); - ToLower(value); - if (value == "true") { - total_hits_count_flag = true; - } else if (value == "false") { - total_hits_count_flag = false; + if(option.value().is_string()) { + String value = option.value(); + ToLower(value); + if (value == "true") { + total_hits_count_flag = true; + } else if (value == "false") { + total_hits_count_flag = false; + } else { + response["error_code"] = ErrorCode::kInvalidExpression; + response["error_message"] = fmt::format("Unknown search option: {}, value: {}", key, value); + return; + } + } else if(option.value().is_boolean()) { + total_hits_count_flag = option.value(); } else { response["error_code"] = ErrorCode::kInvalidExpression; - response["error_message"] = fmt::format("Unknown search option: {}, value: {}", key, value); + response["error_message"] = "Invalid total hits count type"; return; } + } } } else { diff --git a/src/scheduler/fragment_context.cpp b/src/scheduler/fragment_context.cpp index 938e76d66b..303644b89d 100644 --- a/src/scheduler/fragment_context.cpp +++ b/src/scheduler/fragment_context.cpp @@ -1529,6 +1529,7 @@ SharedPtr ParallelMaterializedFragmentCtx::GetResultInternal() { std::set())); } + SizeT total_hits_count = 0; for (const auto &task : tasks_) { if (task->sink_state_->state_type() != SinkStateType::kMaterialize) { String error_message = "Parallel materialized fragment will only have common sink state"; @@ -1540,12 +1541,14 @@ SharedPtr ParallelMaterializedFragmentCtx::GetResultInternal() { result_table = DataTable::MakeResultTable(column_defs); } result_table->total_hits_count_flag_ = materialize_sink_state->total_hits_count_flag_; + total_hits_count += materialize_sink_state->total_hits_count_; for (auto &result_data_block : materialize_sink_state->data_block_array_) { result_table->Append(std::move(result_data_block)); } materialize_sink_state->data_block_array_.clear(); } + result_table->total_hits_count_ = total_hits_count; return result_table; } @@ -1590,6 +1593,7 @@ SharedPtr ParallelStreamFragmentCtx::GetResultInternal() { result_table = DataTable::MakeResultTable(column_defs); } result_table->total_hits_count_flag_ = materialize_sink_state->total_hits_count_flag_; + result_table->total_hits_count_ = materialize_sink_state->total_hits_count_; for (auto &result_data_block : materialize_sink_state->data_block_array_) { result_table->Append(std::move(result_data_block)); diff --git a/tools/run_http_api.py b/tools/run_http_api.py index 1812d2a90d..ba75651b20 100644 --- a/tools/run_http_api.py +++ b/tools/run_http_api.py @@ -44,7 +44,7 @@ def python_sdk_test(python_test_dir: str, pytest_mark: str): print("Note: this script must be run under root directory of the project.") current_path = os.getcwd() python_test_dir = current_path + "/python" - parser = argparse.ArgumentParser(description="Http Api Test For Infinity") + parser = argparse.ArgumentParser(description="Http API Test For Infinity") parser.add_argument( "-m", "--pytest_mark", @@ -54,7 +54,7 @@ def python_sdk_test(python_test_dir: str, pytest_mark: str): ) args = parser.parse_args() - print("Start Http Api testing...") + print("Start Http API testing...") start = time.time() try: python_sdk_test(python_test_dir, args.pytest_mark)