Skip to content

Commit

Permalink
Merge branch 'main' into pysdk_version_dev2
Browse files Browse the repository at this point in the history
  • Loading branch information
loloxwg authored Dec 20, 2023
2 parents 306e695 + 631d8fc commit 3887b0b
Show file tree
Hide file tree
Showing 14 changed files with 109 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@

# build directory of Gradle, CMake etc
tmp
build
build*/
cmake-build-*

# ignore compilation configure file
Expand Down
2 changes: 1 addition & 1 deletion benchmark/remote_infinity/remote_query_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct InfinityClient {
std::unique_ptr<InfinityServiceClient> client;
int64_t session_id;
InfinityClient() {
socket.reset(new TSocket("127.0.0.1", 9080));
socket.reset(new TSocket("127.0.0.1", 23817));
transport.reset(new TBufferedTransport(socket));
protocol.reset(new TBinaryProtocol(transport));
client = std::make_unique<InfinityServiceClient>(protocol);
Expand Down
4 changes: 2 additions & 2 deletions conf/infinity_conf.toml.template
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ sdk_port = 23817

[system]
# 0 means use all cpus
worker_cpu_limit = 4
worker_cpu_limit = 0
# 0 means use all memory
total_memory_size = "8GB"
# memory limit per query
Expand All @@ -28,7 +28,7 @@ log_max_size = "10GB"
log_file_rotate_count = 10

# trace/info/warning/error/critical 5 log levels, default: info
log_level = "trace"
log_level = "warning"

[storage]
data_dir = "/tmp/infinity/data"
Expand Down
84 changes: 84 additions & 0 deletions docs/benchmark.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Benchmark

**infinity** supply python script for sift1m and gist1m dataset benchmark.

## Get infinity binary file

```sh
git clone https://github.com/infiniflow/infinity.git
cd infinity
```

## download benchmark file

download via wget

```sh
#download sift benchmark
wget ftp://ftp.irisa.fr/local/texmex/corpus/sift.tar.gz
#download gist benchmark
wget ftp://ftp.irisa.fr/local/texmex/corpus/gist.tar.gz

```

or
visit [http://corpus-texmex.irisa.fr/](http://corpus-texmex.irisa.fr/) to download manually.

```sh
#uncompress and move benchmark file
tar -zxvf sift.tar.gz
mv sift/sift_base.fvecs test/data/benchmark/sift_1m/sift_base.fvecs
mv sift/sift_query.fvecs test/data/benchmark/sift_1m/sift_query.fvecs
mv sift/sift_groundtruth.ivecs test/data/benchmark/sift_1m/sift_groundtruth.ivecs


tar -zxvf gist.tar.gz
mv gist/gist_base.fvecs test/data/benchmark/gist_1m/gist_base.fvecs
mv gist/gist_query.fvecs test/data/benchmark/gist_1m/gist_query.fvecs
mv gist/gist_groundtruth.ivecs test/data/benchmark/gist_1m/gist_groundtruth.ivecs

```

## Benchmark dependency

```sh
cd python

pip install -r requirements.txt
python setup.py bdist_wheel
pip install dist/infinity_sdk-0.1.0.dev1-py3-none-any.whl
```

## Start infinity

Read [README.md](https://github.com/infiniflow/infinity/blob/main/README.md) to start infinity.

## Import data

```sh
cd benchmark

options:
-h, --help show this help message and exit
-d DATA_SET, --data DATA_SET

python remote_benchmark_import.py -d sift_1m
python remote_benchmark_import.py -d gist_1m
```

## run benchmark

```sh
options:
-h, --help show this help message and exit
-t THREADS, --threads THREADS
-r ROUNDS, --rounds ROUNDS
-d DATA_SET, --data DATA_SET

# ROUNDS means how many times python runs the benchmark. The result is the average for each time.

# following command means run benchmark with 1 thread, for 1 time using sift dataset
python remote_benchmark.py -t 1 -r 1 -d sift_1m

python remote_benchmark.py -t 1 -r 1 -d gist_1m
```
5 changes: 2 additions & 3 deletions python/benchmark/remote_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,12 @@ def process_pool(threads, rounds, query_path, tabel_name):
queries = fvecs_read_all(query_path)

for i in range(rounds):
start = time.time()
p = multiprocessing.Pool(threads)
for idx, query_vec in enumerate(queries):
start = time.time()
for idx, query_vec in enumerate(fvecs_read(query_path)):
p.apply_async(work, args=(query_vec, 100, "l2", "col1", "float", tabel_name))
p.close()
p.join()

end = time.time()
dur = end - start
results.append(f"Round {i + 1}:")
Expand Down
2 changes: 1 addition & 1 deletion python/benchmark/test_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def drop_index(infinity_obj, port, process_id, thread_id, num_iteration):

ip: str = '0.0.0.0'
thrift = ("Thrift", ip, 9090)
thread_pool_thrift = ("Thread Pool Thrift", ip, 9080)
thread_pool_thrift = ("Thread Pool Thrift", ip, 23817)
async_thrift = ("AsyncThrift", ip, 9070)
num_processes = 16
num_threads = 16
Expand Down
2 changes: 1 addition & 1 deletion python/infinity/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
def connect(
uri: URI = LOCAL_HOST
) -> InfinityConnection:
if isinstance(uri, NetworkAddress) and (uri.port == 9090 or uri.port == 9080 or uri.port == 9070):
if isinstance(uri, NetworkAddress) and (uri.port == 9090 or uri.port == 23817 or uri.port == 9070):
return RemoteThriftInfinityConnection(uri)
else:
raise Exception(f"unknown uri: {uri}")
4 changes: 2 additions & 2 deletions python/infinity/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ def __str__(self):
VEC = Union[list, np.ndarray]
INSERT_DATA = dict[str, Union[str, int, float, list[Union[int, float]]]]

REMOTE_HOST = NetworkAddress("127.0.0.1", 9080)
LOCAL_HOST = NetworkAddress("0.0.0.0", 9080)
REMOTE_HOST = NetworkAddress("127.0.0.1", 23817)
LOCAL_HOST = NetworkAddress("0.0.0.0", 23817)
4 changes: 2 additions & 2 deletions python/test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_connection(self):
method: connect server
expect: connect and disconnect successfully
"""
ports = [9090, 9080, 9070]
ports = [9090, 23817, 9070]
for port in ports:
infinity_obj = infinity.connect(REMOTE_HOST)
assert infinity_obj
Expand Down Expand Up @@ -73,7 +73,7 @@ def test_infinity_thrift(self):
12.
expect: all operations successfully
"""
ports = [9080]
ports = [23817]
for port in ports:
infinity_obj = infinity.connect(REMOTE_HOST)
assert infinity_obj
Expand Down
9 changes: 7 additions & 2 deletions src/bin/infinity_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import stl;
import third_party;
import db_server;
import infinity_exception;
import infinity_context;

namespace {

Expand Down Expand Up @@ -145,13 +146,17 @@ auto main(int argc, char **argv) -> int {
StartupParameter parameters;
ParseArguments(argc, argv, parameters);

db_server.Init(parameters);
InfinityContext::instance().Init(parameters.config_path);

RegisterSignal();

InfinityContext::instance().config()->PrintAll();

// threaded_thrift_server.Init(9090);
// threaded_thrift_thread = infinity::Thread([&]() { threaded_thrift_server.Start(); });
u32 thrift_server_port = InfinityContext::instance().config()->sdk_port();

pool_thrift_server.Init(9080, 128);
pool_thrift_server.Init(thrift_server_port, 128);
pool_thrift_thread = infinity::Thread([&]() { pool_thrift_server.Start(); });

// non_block_pool_thrift_server.Init(9070, 64);
Expand Down
2 changes: 1 addition & 1 deletion src/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ SharedPtr<String> Config::Init(const SharedPtr<String> &config_path) {
// Resource
{ system_option_.resource_dict_path_ = default_resource_dict_path; }
} else {
Printf("Read config from: {}", *config_path);
Printf("Read config from: {}\n", *config_path);
TomlTable config = TomlParseFile(*config_path);
// General
{
Expand Down
20 changes: 4 additions & 16 deletions src/network/db_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,28 @@ import connection;

namespace infinity {

void DBServer::Init(const StartupParameter &parameter) { config_path_ = Move(parameter.config_path); }

void DBServer::Run() {
if (initialized) {
return;
}

initialized = true;

InfinityContext::instance().Init(config_path_);

u16 pg_port = InfinityContext::instance().config()->pg_port();
const String &listen_address_ref = InfinityContext::instance().config()->listen_address();
const String &pg_listen_addr = InfinityContext::instance().config()->listen_address();

BoostErrorCode error;
AsioIpAddr address = asio_make_address(listen_address_ref, error);
AsioIpAddr address = asio_make_address(pg_listen_addr, error);
if (error) {
Printf("{} isn't a valid IPv4 address.\n", listen_address_ref);
Printf("{} isn't a valid IPv4 address.\n", pg_listen_addr);
infinity::InfinityContext::instance().UnInit();
return ;
}

acceptor_ptr_ = MakeUnique<AsioAcceptor>(io_service_, AsioEndPoint(address, pg_port));
CreateConnection();

if (config_path_) {
Printf("Start up database server, at: {} and port: {}, config: {}\n", listen_address_ref, pg_port, *config_path_);
} else {
Printf("Start up database server, at: {} and port: {}\n", listen_address_ref, pg_port);
}

InfinityContext::instance().config()->PrintAll();

Printf("Run 'psql -h {} -p {}' to connect to the server.\n", listen_address_ref, pg_port);
Printf("Run 'psql -h {} -p {}' to connect to the server.\n", pg_listen_addr, pg_port);

io_service_.run();
}
Expand Down
3 changes: 0 additions & 3 deletions src/network/db_server.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ class ThriftServer;

export class DBServer {
public:
void Init(const StartupParameter& parameter);

void Run();

void Shutdown();
Expand All @@ -46,7 +44,6 @@ private:
atomic_u64 running_connection_count_{0};
AsioIOService io_service_{};
UniquePtr<AsioAcceptor> acceptor_ptr_{};
SharedPtr<String> config_path_{};
};

}
2 changes: 1 addition & 1 deletion src/network/thrift_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@ void PoolThriftServer::Init(i32 port_no, i32 pool_size) {
threadManager->threadFactory(threadFactory);
threadManager->start();

std::cout << "Thrift server listen on: 0.0.0.0:" << port_no << ", thread pool: " << pool_size << std::endl;
std::cout << "API server listen on: 0.0.0.0:" << port_no << ", thread pool: " << pool_size << std::endl;

server =
MakeUnique<TThreadPoolServer>(MakeShared<infinity_thrift_rpc::InfinityServiceProcessorFactory>(MakeShared<InfinityServiceCloneFactory>()),
Expand Down

0 comments on commit 3887b0b

Please sign in to comment.