Skip to content

Commit

Permalink
Fix bugs (#2229)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

1. Fix bug: check storage mode instead of role in txn commit. In
InfinityContext::ChangeRole, we first change role in cluster manager,
then set storage mode which makes old transaction that not committed
violate the check of role. So I changed to check storage mode.
2. Add log collect in embedded test.
3. Fix remove object file before upload.

https://github.com/infiniflow/infinity/actions/runs/11789675361/job/32838815976?pr=2212

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
  • Loading branch information
small-turtle-1 authored Nov 13, 2024
1 parent 4507682 commit d990b18
Show file tree
Hide file tree
Showing 19 changed files with 170 additions and 33 deletions.
11 changes: 7 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,14 @@ jobs:
- name: Test embedded infinity for Python 3.10
if: ${{ !cancelled() && !failure() }}
id: run_pysdk_local_infinity_test
run: sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && rm -fr /var/infinity/* && cp test/data/config/infinity_conf.toml /var/infinity && source /usr/local/venv310/bin/activate && python3 tools/run_pysdk_local_infinity_test.py"

- name: Collect embedded infinity log
if: ${{ !cancelled() && failure()}} # run this step if previous steps failed
run: sudo docker exec ${BUILDER_CONTAINER} bash -c "cat /var/infinity/log/infinity.log"
run: |
failure="${{ steps.run_pysdk_local_infinity_test.outcome == 'failure'}}"
sudo python3 scripts/collect_embedded_log.py --output_dir=${RUNNER_WORKSPACE_PREFIX}/log --failure=${failure} --log_path=/var/infinity/log/infinity.log
- name: Start minio container
if: ${{ !cancelled() && !failure() }}
Expand Down Expand Up @@ -380,9 +383,9 @@ jobs:
if: ${{ !cancelled() && !failure() }}
run: |
rm -fr $PWD/db_tmp && mkdir -p $PWD/db_tmp && cat conf/infinity_conf.toml | sed -e "s|/var/infinity|$PWD/db_tmp|g" > $PWD/db_tmp/infinity_conf.toml && \
./cmake-build-release/benchmark/local_infinity/knn_import_benchmark sift $PWD/test/data $PWD/db_tmp && \
echo "1 50" | ./cmake-build-release/benchmark/local_infinity/knn_query_benchmark sift 200 false $PWD/test/data $PWD/db_tmp | tee benchmark_sift_1_thread.log && \
echo "8 50" | ./cmake-build-release/benchmark/local_infinity/knn_query_benchmark sift 200 false $PWD/test/data $PWD/db_tmp | tee benchmark_sift_8_threads.log && \
./cmake-build-release/benchmark/local_infinity/knn_import_benchmark sift $PWD/test/data $PWD/db_tmp db_tmp/infinity_conf.toml && \
echo "1 50" | ./cmake-build-release/benchmark/local_infinity/knn_query_benchmark sift 200 false $PWD/test/data $PWD/db_tmp db_tmp/infinity_conf.toml | tee benchmark_sift_1_thread.log && \
echo "8 50" | ./cmake-build-release/benchmark/local_infinity/knn_query_benchmark sift 200 false $PWD/test/data $PWD/db_tmp db_tmp/infinity_conf.toml | tee benchmark_sift_8_threads.log && \
./tools/ci_tools/check_benchmark_result.py ${RUNNER_WORKSPACE_PREFIX}/benchmark/golden_benchmark_sift_1_thread.log benchmark_sift_1_thread.log && \
./tools/ci_tools/check_benchmark_result.py ${RUNNER_WORKSPACE_PREFIX}/benchmark/golden_benchmark_sift_8_threads.log benchmark_sift_8_threads.log
Expand Down
7 changes: 6 additions & 1 deletion benchmark/local_infinity/knn/knn_import_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,19 @@ int main(int argc, char *argv[]) {
data_path = std::string(argv[3]);
}

std::string config_path;
if (argc >= 5) {
config_path = std::string(argv[4]);
}

if (VirtualStore::Exists(data_path)) {
std::cout << "Data path: " << data_path << " is already existed." << std::endl;
} else {
VirtualStore::MakeDirectory(data_path);
std::cout << "Data path: " << data_path << " is created." << std::endl;
}

Infinity::LocalInit(data_path);
Infinity::LocalInit(data_path, config_path);

std::cout << ">>> Import Benchmark Start <<<" << std::endl;

Expand Down
7 changes: 6 additions & 1 deletion benchmark/local_infinity/knn/knn_query_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,12 @@ int main(int argc, char *argv[]) {
path = std::string(argv[5]);
}

Infinity::LocalInit(path);
std::string config_path;
if (argc >= 7) {
config_path = std::string(argv[6]);
}

Infinity::LocalInit(path, config_path);

std::cout << ">>> Query Benchmark Start <<<" << std::endl;
std::cout << "Thread Num: " << thread_num << ", Times: " << total_times << std::endl;
Expand Down
29 changes: 29 additions & 0 deletions conf/pytest_embedded_infinity_conf.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[general]
version = "0.5.0"
time_zone = "utc-8"

[network]
server_address = "0.0.0.0"
http_port = 23820
peer_ip = "0.0.0.0"
peer_port = 23850

[log]
log_filename = "infinity.log"
log_dir = "/var/infinity/log"
log_to_stdout = false
log_level = "trace"

[storage]
persistence_dir = "/var/infinity/persistence"

[buffer]
buffer_manager_size = "8GB"
temp_dir = "/var/infinity/tmp"
result_cache = "on"

[wal]
wal_dir = "/var/infinity/wal"

[resource]
resource_dir = "/var/infinity/resource"
6 changes: 3 additions & 3 deletions python/infinity_embedded/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
# import pkg_resources
# __version__ = pkg_resources.get_distribution("infinity_sdk").version

from infinity_embedded.common import URI, NetworkAddress, LOCAL_HOST, LOCAL_INFINITY_PATH, InfinityException
from infinity_embedded.common import URI, NetworkAddress, LOCAL_HOST, LOCAL_INFINITY_PATH, InfinityException, LOCAL_INFINITY_CONFIG_PATH
from infinity_embedded.infinity import InfinityConnection
from infinity_embedded.local_infinity.infinity import LocalInfinityConnection
from infinity_embedded.errors import ErrorCode

def connect(uri = LOCAL_INFINITY_PATH) -> InfinityConnection:
def connect(uri = LOCAL_INFINITY_PATH, config_path = LOCAL_INFINITY_CONFIG_PATH) -> InfinityConnection:
if isinstance(uri, str) and len(uri) != 0:
return LocalInfinityConnection(uri)
return LocalInfinityConnection(uri, config_path)
else:
raise InfinityException(ErrorCode.INVALID_SERVER_ADDRESS, f"Unknown uri: {uri}")
1 change: 1 addition & 0 deletions python/infinity_embedded/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __repr__(self):

# test embedded_infinity
LOCAL_INFINITY_PATH = "/var/infinity"
LOCAL_INFINITY_CONFIG_PATH = "conf/pytest_embedded_infinity_conf.toml"


class ConflictType(object):
Expand Down
6 changes: 3 additions & 3 deletions python/infinity_embedded/local_infinity/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

from infinity_embedded.errors import ErrorCode as PyErrorCode
from infinity_embedded.common import LOCAL_INFINITY_PATH
from infinity_embedded.common import LOCAL_INFINITY_PATH, LOCAL_INFINITY_CONFIG_PATH
from infinity_embedded.embedded_infinity_ext import *
from typing import List

Expand Down Expand Up @@ -43,9 +43,9 @@ def __init__(self, error_code: PyErrorCode, error_msg: str, db_names=None, table


class LocalInfinityClient:
def __init__(self, path: str = LOCAL_INFINITY_PATH):
def __init__(self, path: str = LOCAL_INFINITY_PATH, config_path = LOCAL_INFINITY_CONFIG_PATH):
self.path = path
Infinity.LocalInit(path)
Infinity.LocalInit(path, config_path)
self.client = Infinity.LocalConnect()

def __del__(self):
Expand Down
6 changes: 3 additions & 3 deletions python/infinity_embedded/local_infinity/infinity.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from infinity_embedded import InfinityConnection
from abc import ABC
from infinity_embedded.common import ConflictType, LOCAL_INFINITY_PATH, InfinityException
from infinity_embedded.common import ConflictType, LOCAL_INFINITY_PATH, InfinityException, LOCAL_INFINITY_CONFIG_PATH
from infinity_embedded.local_infinity.client import LocalInfinityClient
from infinity_embedded.embedded_infinity_ext import ConflictType as LocalConflictType
from infinity_embedded.errors import ErrorCode
Expand All @@ -11,7 +11,7 @@


class LocalInfinityConnection(InfinityConnection, ABC):
def __init__(self, uri=LOCAL_INFINITY_PATH):
def __init__(self, uri=LOCAL_INFINITY_PATH, config_path=LOCAL_INFINITY_CONFIG_PATH):
if not os.path.exists(uri):
try:
logging.warning(f"Directory {uri} not found, try to create it")
Expand All @@ -20,7 +20,7 @@ def __init__(self, uri=LOCAL_INFINITY_PATH):
raise InfinityException(ErrorCode.DIR_NOT_FOUND, f"Directory {uri} not found and create failed: {e}")
if os.path.isdir(uri):
if os.access(uri, os.R_OK | os.W_OK):
self._client = LocalInfinityClient(uri)
self._client = LocalInfinityClient(uri, config_path)
self._is_connected = True
else:
raise InfinityException(ErrorCode.UNEXPECTED_ERROR,
Expand Down
7 changes: 7 additions & 0 deletions python/infinity_sdk/infinity/remote_thrift/infinity.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ def cleanup(self):
return res
else:
raise InfinityException(res.error_code, res.error_msg)

def optimize(self, db_name: str, table_name: str, optimize_opt: ttypes.OptimizeOptions):
res = self._client.optimize(db_name, table_name, optimize_opt)
if res.error_code == ErrorCode.OK:
return res
else:
raise InfinityException(res.error_code, res.error_msg)

def disconnect(self):
res = self._client.disconnect()
Expand Down
5 changes: 2 additions & 3 deletions python/restart_test/test_memidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ def check():
assert data_dict["count(star)"] == [13]

check()
# wait for optimize
time.sleep(3)
infinity_obj.optimize("default_db", "test_memidx1", optimize_opt = None)
check()

db_obj.drop_table("test_memidx1")
Expand Down Expand Up @@ -196,7 +195,7 @@ def part2(infinity_obj):

idx1_dir = idx1_dirs[0]
idx1_files = list(idx1_dir.glob("*"))
assert len(idx1_files) == 3
assert len(idx1_files) <= 3

idx2_dir = idx2_dirs[0]
idx2_files = list(idx2_dir.glob("*"))
Expand Down
39 changes: 39 additions & 0 deletions scripts/collect_embedded_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import argparse
import os
import random
import shutil
import string

parser = argparse.ArgumentParser(description="Collect and copy embedded log files.")
parser.add_argument(
"--output_dir", type=str, required=True, help="Path to the output directory"
)
parser.add_argument("--failure", type=str, required=True, help="If the test failured")
parser.add_argument(
"--log_path", type=str, required=True, help="Path to the embedded log file"
)

args = parser.parse_args()
output_dir = args.output_dir
failure = args.failure == "true" or args.failure == "True"
log_path = args.log_path
show_lines = 1000

print(f"Embedded test {"failed" if failure else "succeeded"}")

if not os.path.isdir(output_dir):
os.makedirs(output_dir)

if failure:
random_name = "".join(random.choices(string.ascii_lowercase + string.digits, k=8))
print(f"Random log file name: {random_name}")

if not os.path.isfile(log_path):
print("Error: log file not found")
elif failure:
shutil.copy(log_path, f"{output_dir}/{random_name}_embedded.log")
print(f"Last {show_lines} lines from {log_path}:")
with open(log_path, "r") as f:
lines = f.readlines()
for line in lines[-show_lines:]:
print(line.strip())
10 changes: 5 additions & 5 deletions src/main/infinity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ u64 Infinity::GetSessionId() { return session_->session_id(); }

void Infinity::Hello() { fmt::print("hello infinity\n"); }

void Infinity::LocalInit(const String &path) {

SharedPtr<String> config_path = MakeShared<String>(std::filesystem::absolute(path + "/infinity_conf.toml"));
if (VirtualStore::Exists(*config_path)) {
InfinityContext::instance().Init(config_path);
void Infinity::LocalInit(const String &path, const String &config_path) {
if (!config_path.empty() && VirtualStore::Exists(config_path)) {
SharedPtr<String> config_path_ptr = MakeShared<String>(config_path);
InfinityContext::instance().Init(config_path_ptr);
} else {
LOG_WARN(fmt::format("Infinity::LocalInit cannot find config: {}", config_path));
UniquePtr<DefaultConfig> default_config = MakeUnique<DefaultConfig>();
default_config->default_log_dir_ = fmt::format("{}/log", path);
default_config->default_data_dir_ = fmt::format("{}/data", path);
Expand Down
2 changes: 1 addition & 1 deletion src/main/infinity.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public:

u64 GetSessionId();

static void LocalInit(const String &path);
static void LocalInit(const String &path, const String &config_path = "");

static void LocalUnInit();

Expand Down
13 changes: 13 additions & 0 deletions src/storage/io/object_storage_process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
module;

#include <cassert>
#include <filesystem>

module object_storage_process;

Expand All @@ -26,6 +27,8 @@ import infinity_exception;
import third_party;
import virtual_store;

namespace fs = std::filesystem;

namespace infinity {

void ObjectStorageProcess::Start() {
Expand Down Expand Up @@ -95,6 +98,16 @@ void ObjectStorageProcess::Process() {
LOG_TRACE("Remove task done");
break;
}
case ObjectStorageTaskType::kLocalDrop: {
LOG_TRACE("Local drop task");
LocalDropTask *local_drop_task = static_cast<LocalDropTask *>(object_storage_task.get());
bool removed = fs::remove(local_drop_task->drop_path_);
if (!removed) {
LOG_WARN(fmt::format("ObjectStorageProcess::Process failed to remove file: {}", local_drop_task->drop_path_));
}
LOG_TRACE("Local drop task done");
break;
}
default: {
String error_message = fmt::format("Invalid object storage: {}", (u8)object_storage_task->type_);
UnrecoverableError(error_message);
Expand Down
19 changes: 18 additions & 1 deletion src/storage/io/object_storage_task.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,15 @@ import stl;

namespace infinity {

export enum class ObjectStorageTaskType { kDownload, kUpload, kCopy, kRemove, kStopProcessor, kInvalid };
export enum class ObjectStorageTaskType {
kInvalid,
kDownload,
kUpload,
kCopy,
kRemove,
kStopProcessor,
kLocalDrop,
};

export struct BaseObjectStorageTask {
BaseObjectStorageTask(ObjectStorageTaskType type) : type_(type) {}
Expand Down Expand Up @@ -99,4 +107,13 @@ export struct StopObjectStorageProcessTask final : public BaseObjectStorageTask
String ToString() const final { return "Stop processor"; }
};

export struct LocalDropTask final : public BaseObjectStorageTask {
LocalDropTask(String drop_path) : BaseObjectStorageTask(ObjectStorageTaskType::kLocalDrop), drop_path_(drop_path) {}

~LocalDropTask() = default;

String ToString() const final { return "Local Drop Task"; }
String drop_path_;
};

} // namespace infinity
20 changes: 20 additions & 0 deletions src/storage/io/virtual_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import infinity_context;
import object_storage_task;
import admin_statement;

namespace fs = std::filesystem;

namespace infinity {

StorageType String2StorageType(const String &storage_type) {
Expand Down Expand Up @@ -190,6 +192,24 @@ Status VirtualStore::DeleteFile(const String &file_name) {
return Status::OK();
}

Status VirtualStore::DeleteFileBG(const String &path) {
switch (VirtualStore::storage_type_) {
case StorageType::kMinio: {
auto drop_task = MakeShared<LocalDropTask>(path);
auto object_storage_processor = infinity::InfinityContext::instance().storage()->object_storage_processor();
object_storage_processor->Submit(drop_task);
drop_task->Wait();
break;
}
default: {
fs::remove(path);
break;
}
}

return Status::OK();
}

Status VirtualStore::MakeDirectory(const String &path) {
if (VirtualStore::Exists(path)) {
if (std::filesystem::is_directory(path)) {
Expand Down
1 change: 1 addition & 0 deletions src/storage/io/virtual_store.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public:
static bool IsRegularFile(const String &path);
static bool Exists(const String &path);
static Status DeleteFile(const String &path);
static Status DeleteFileBG(const String &path);
static Status MakeDirectory(const String &path);
static Status RemoveDirectory(const String &path);
static Status CleanupDirectory(const String &path);
Expand Down
Loading

0 comments on commit d990b18

Please sign in to comment.