Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor thrift server port to: 23817 #333

Merged
merged 1 commit into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmark/remote_infinity/remote_query_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct InfinityClient {
std::unique_ptr<InfinityServiceClient> client;
int64_t session_id;
InfinityClient() {
socket.reset(new TSocket("127.0.0.1", 9080));
socket.reset(new TSocket("127.0.0.1", 23817));
transport.reset(new TBufferedTransport(socket));
protocol.reset(new TBinaryProtocol(transport));
client = std::make_unique<InfinityServiceClient>(protocol);
Expand Down
4 changes: 2 additions & 2 deletions conf/infinity_conf.toml.template
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ sdk_port = 23817

[system]
# 0 means use all cpus
worker_cpu_limit = 4
worker_cpu_limit = 0
# 0 means use all memory
total_memory_size = "8GB"
# memory limit per query
Expand All @@ -28,7 +28,7 @@ log_max_size = "10GB"
log_file_rotate_count = 10

# trace/info/warning/error/critical 5 log levels, default: info
log_level = "trace"
log_level = "warning"

[storage]
data_dir = "/tmp/infinity/data"
Expand Down
2 changes: 1 addition & 1 deletion python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pip install dist/infinity-0.0.1-py3-none-any.whl
import infinity
from infinity import NetworkAddress

infinity_obj = infinity.connect(NetworkAddress('127.0.0.1', 9080))
infinity_obj = infinity.connect(NetworkAddress('127.0.0.1', 23817))

# infinity
res = infinity_obj.create_database("my_db")
Expand Down
5 changes: 2 additions & 3 deletions python/benchmark/remote_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,12 @@ def process_pool(threads, rounds, query_path, tabel_name):
queries = fvecs_read_all(query_path)

for i in range(rounds):
start = time.time()
p = multiprocessing.Pool(threads)
for idx, query_vec in enumerate(queries):
start = time.time()
for idx, query_vec in enumerate(fvecs_read(query_path)):
p.apply_async(work, args=(query_vec, 100, "l2", "col1", "float", tabel_name))
p.close()
p.join()

end = time.time()
dur = end - start
results.append(f"Round {i + 1}:")
Expand Down
2 changes: 1 addition & 1 deletion python/benchmark/test_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def drop_index(infinity_obj, port, process_id, thread_id, num_iteration):

ip: str = '0.0.0.0'
thrift = ("Thrift", ip, 9090)
thread_pool_thrift = ("Thread Pool Thrift", ip, 9080)
thread_pool_thrift = ("Thread Pool Thrift", ip, 23817)
async_thrift = ("AsyncThrift", ip, 9070)
num_processes = 16
num_threads = 16
Expand Down
2 changes: 1 addition & 1 deletion python/infinity/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
def connect(
uri: URI = LOCAL_HOST
) -> InfinityConnection:
if isinstance(uri, NetworkAddress) and (uri.port == 9090 or uri.port == 9080 or uri.port == 9070):
if isinstance(uri, NetworkAddress) and (uri.port == 9090 or uri.port == 23817 or uri.port == 9070):
return RemoteThriftInfinityConnection(uri)
else:
raise Exception(f"unknown uri: {uri}")
4 changes: 2 additions & 2 deletions python/infinity/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ def __str__(self):
URI = Union[NetworkAddress, Path]
VEC = Union[list, np.ndarray]

REMOTE_HOST = NetworkAddress("127.0.0.1", 9080)
LOCAL_HOST = NetworkAddress("0.0.0.0", 9080)
REMOTE_HOST = NetworkAddress("127.0.0.1", 23817)
LOCAL_HOST = NetworkAddress("0.0.0.0", 23817)
4 changes: 2 additions & 2 deletions python/test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_connection(self):
method: connect server
expect: connect and disconnect successfully
"""
ports = [9090, 9080, 9070]
ports = [9090, 23817, 9070]
for port in ports:
infinity_obj = infinity.connect(REMOTE_HOST)
assert infinity_obj
Expand Down Expand Up @@ -73,7 +73,7 @@ def test_infinity_thrift(self):
12.
expect: all operations successfully
"""
ports = [9080]
ports = [23817]
for port in ports:
infinity_obj = infinity.connect(REMOTE_HOST)
assert infinity_obj
Expand Down
2 changes: 1 addition & 1 deletion python/test/test_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_import(self):
method: connect server, create table, import data, search, drop table, disconnect
expect: all operations successfully
"""
ports = [9080]
ports = [23817]
for port in ports:
infinity_obj = infinity.connect(REMOTE_HOST)
assert infinity_obj
Expand Down
9 changes: 7 additions & 2 deletions src/bin/infinity_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import stl;
import third_party;
import db_server;
import infinity_exception;
import infinity_context;

namespace {

Expand Down Expand Up @@ -145,13 +146,17 @@ auto main(int argc, char **argv) -> int {
StartupParameter parameters;
ParseArguments(argc, argv, parameters);

db_server.Init(parameters);
InfinityContext::instance().Init(parameters.config_path);

RegisterSignal();

InfinityContext::instance().config()->PrintAll();

// threaded_thrift_server.Init(9090);
// threaded_thrift_thread = infinity::Thread([&]() { threaded_thrift_server.Start(); });
u32 thrift_server_port = InfinityContext::instance().config()->sdk_port();

pool_thrift_server.Init(9080, 128);
pool_thrift_server.Init(thrift_server_port, 128);
pool_thrift_thread = infinity::Thread([&]() { pool_thrift_server.Start(); });

// non_block_pool_thrift_server.Init(9070, 64);
Expand Down
2 changes: 1 addition & 1 deletion src/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ SharedPtr<String> Config::Init(const SharedPtr<String> &config_path) {
// Resource
{ system_option_.resource_dict_path_ = default_resource_dict_path; }
} else {
Printf("Read config from: {}", *config_path);
Printf("Read config from: {}\n", *config_path);
TomlTable config = TomlParseFile(*config_path);
// General
{
Expand Down
20 changes: 4 additions & 16 deletions src/network/db_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,28 @@ import connection;

namespace infinity {

void DBServer::Init(const StartupParameter &parameter) { config_path_ = Move(parameter.config_path); }

void DBServer::Run() {
if (initialized) {
return;
}

initialized = true;

InfinityContext::instance().Init(config_path_);

u16 pg_port = InfinityContext::instance().config()->pg_port();
const String &listen_address_ref = InfinityContext::instance().config()->listen_address();
const String &pg_listen_addr = InfinityContext::instance().config()->listen_address();

BoostErrorCode error;
AsioIpAddr address = asio_make_address(listen_address_ref, error);
AsioIpAddr address = asio_make_address(pg_listen_addr, error);
if (error) {
Printf("{} isn't a valid IPv4 address.\n", listen_address_ref);
Printf("{} isn't a valid IPv4 address.\n", pg_listen_addr);
infinity::InfinityContext::instance().UnInit();
return ;
}

acceptor_ptr_ = MakeUnique<AsioAcceptor>(io_service_, AsioEndPoint(address, pg_port));
CreateConnection();

if (config_path_) {
Printf("Start up database server, at: {} and port: {}, config: {}\n", listen_address_ref, pg_port, *config_path_);
} else {
Printf("Start up database server, at: {} and port: {}\n", listen_address_ref, pg_port);
}

InfinityContext::instance().config()->PrintAll();

Printf("Run 'psql -h {} -p {}' to connect to the server.\n", listen_address_ref, pg_port);
Printf("Run 'psql -h {} -p {}' to connect to the server.\n", pg_listen_addr, pg_port);

io_service_.run();
}
Expand Down
3 changes: 0 additions & 3 deletions src/network/db_server.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ class ThriftServer;

export class DBServer {
public:
void Init(const StartupParameter& parameter);

void Run();

void Shutdown();
Expand All @@ -46,7 +44,6 @@ private:
atomic_u64 running_connection_count_{0};
AsioIOService io_service_{};
UniquePtr<AsioAcceptor> acceptor_ptr_{};
SharedPtr<String> config_path_{};
};

}
2 changes: 1 addition & 1 deletion src/network/thrift_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@ void PoolThriftServer::Init(i32 port_no, i32 pool_size) {
threadManager->threadFactory(threadFactory);
threadManager->start();

std::cout << "Thrift server listen on: 0.0.0.0:" << port_no << ", thread pool: " << pool_size << std::endl;
std::cout << "API server listen on: 0.0.0.0:" << port_no << ", thread pool: " << pool_size << std::endl;

server =
MakeUnique<TThreadPoolServer>(MakeShared<infinity_thrift_rpc::InfinityServiceProcessorFactory>(MakeShared<InfinityServiceCloneFactory>()),
Expand Down
Loading