Skip to content

Commit

Permalink
Replace Linux open file flag with kuzu open file flag (#2931)
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Oct 13, 2024
1 parent 492d687 commit 7d39282
Show file tree
Hide file tree
Showing 28 changed files with 139 additions and 112 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ src/antlr4/.antlr/
# macOS
.DS_Store

# Dataset
dataset/ldbc-1

### Node.js
tools/nodejs_api/node_modules/
tools/nodejs_api/cmake_install.cmake
Expand Down
5 changes: 3 additions & 2 deletions extension/httpfs/src/cached_file_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ std::unique_ptr<FileInfo> CachedFileManager::getCachedFileInfo(HTTPFileInfo* htt
auto fileName = FileSystem::getFileName(httpFileInfo->path);
auto cacheFilePath = getCachedFilePath(fileName, transactionID);
if (!vfs->fileOrPathExists(cacheFilePath)) {
auto cacheFileInfo = vfs->openFile(cacheFilePath, O_CREAT | O_RDWR);
auto cacheFileInfo = vfs->openFile(cacheFilePath,
FileFlags::CREATE_IF_NOT_EXISTS | FileFlags::READ_ONLY | FileFlags::WRITE);
downloadFile(httpFileInfo, cacheFileInfo.get());
}
return vfs->openFile(cacheFilePath, O_RDONLY);
return vfs->openFile(cacheFilePath, FileFlags::READ_ONLY);
}

void CachedFileManager::cleanUP(main::ClientContext* context) {
Expand Down
11 changes: 5 additions & 6 deletions extension/httpfs/src/httpfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,15 @@ void HTTPFileInfo::initialize(main::ClientContext* context) {
auto res = hfs->headRequest(this->ptrCast<HTTPFileInfo>(), path, {});
std::string rangeLength;
if (res->code != 200) {
auto accessMode = flags & O_ACCMODE;
if ((accessMode & O_WRONLY) && res->code == 404) {
if (!(flags & O_CREAT)) {
if ((flags & FileFlags::WRITE) && res->code == 404) {
if (!(flags & FileFlags::CREATE_IF_NOT_EXISTS)) {
throw IOException(stringFormat("Unable to open URL: \"{}\" for writing: file does "
"not exist and CREATE flag is not set",
path));
}
length = 0;
return;
} else if ((accessMode & O_RDONLY) && res->code != 404) {
} else if ((flags & FileFlags::READ_ONLY) && res->code != 404) {
// HEAD request fail, use Range request for another try (read only one byte).
auto rangeRequest =
hfs->getRangeRequest(this, this->path, {}, 0, nullptr /* buffer */, 2);
Expand Down Expand Up @@ -77,7 +76,7 @@ void HTTPFileInfo::initialize(main::ClientContext* context) {
}

// Initialize the read buffer now that we know the file exists
if ((flags & O_ACCMODE) == O_RDONLY) {
if (flags & FileFlags::READ_ONLY) {
readBuffer = std::make_unique<uint8_t[]>(READ_BUFFER_LEN);
}

Expand Down Expand Up @@ -139,7 +138,7 @@ bool HTTPFileSystem::canHandleFile(const std::string& path) const {

bool HTTPFileSystem::fileOrPathExists(const std::string& path, main::ClientContext* context) {
try {
auto fileInfo = openFile(path, O_RDONLY, context, FileLockType::READ_LOCK);
auto fileInfo = openFile(path, FileFlags::READ_ONLY, context, FileLockType::READ_LOCK);
auto httpFileInfo = fileInfo->constPtrCast<HTTPFileInfo>();
if (httpFileInfo->length == 0) {
return false;
Expand Down
6 changes: 2 additions & 4 deletions extension/json/src/json_export.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#include "json_export.h"

#include <fcntl.h>

#include "common/file_system/virtual_file_system.h"
#include "common/serializer/buffered_serializer.h"
#include "function/export/export_function.h"
Expand Down Expand Up @@ -35,8 +33,8 @@ struct ExportJSONSharedState : public ExportFuncSharedState {
std::vector<std::string> jsonValues;

void init(main::ClientContext& context, const ExportFuncBindData& bindData) override {
fileInfo = context.getVFSUnsafe()->openFile(bindData.fileName, O_WRONLY | O_CREAT | O_TRUNC,
&context);
fileInfo = context.getVFSUnsafe()->openFile(bindData.fileName,
FileFlags::WRITE | FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS, &context);
}
};

Expand Down
5 changes: 1 addition & 4 deletions extension/json/src/json_utils.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
#include "json_utils.h"

#include <fcntl.h>

#include <cstdlib>

#include "common/exception/binder.h"
#include "common/exception/not_implemented.h"
#include "common/exception/runtime.h"
#include "common/file_system/virtual_file_system.h"
Expand Down Expand Up @@ -657,7 +654,7 @@ static JsonWrapper fileToJsonUnstructuredFormatted(std::shared_ptr<char[]> buffe
JsonWrapper fileToJson(main::ClientContext* context, const std::string& path,
JsonScanFormat format) {

auto file = context->getVFSUnsafe()->openFile(path, O_RDONLY, context);
auto file = context->getVFSUnsafe()->openFile(path, FileFlags::READ_ONLY, context);
auto fileSize = file->getFileSize();
auto buffer = std::make_shared<char[]>(fileSize + 9);
memset(buffer.get() + fileSize, 0 /* valueToSet */, 9 /* len */);
Expand Down
1 change: 0 additions & 1 deletion scripts/antlr4/hash.md5
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
e9d15715485e7edcd27e1374e1ef57cc -
6 changes: 2 additions & 4 deletions src/binder/bind/bind_import_database.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#include <fcntl.h>

#include "binder/binder.h"
#include "binder/bound_import_database.h"
#include "common/copier_config/csv_reader_config.h"
Expand All @@ -22,9 +20,9 @@ static std::string getQueryFromFile(common::VirtualFileSystem* vfs, const std::s
if (!vfs->fileOrPathExists(filePath, context)) {
throw BinderException(stringFormat("File {} does not exist.", filePath));
}
auto fileInfo = vfs->openFile(filePath, O_RDONLY
auto fileInfo = vfs->openFile(filePath, FileFlags::READ_ONLY
#ifdef _WIN32
| _O_BINARY
| FileFlags::BINARY
#endif
);
auto fsize = fileInfo->getFileSize();
Expand Down
9 changes: 4 additions & 5 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#include "catalog/catalog.h"

#include <fcntl.h>

#include "binder/ddl/bound_alter_info.h"
#include "binder/ddl/bound_create_sequence_info.h"
#include "binder/ddl/bound_create_table_info.h"
Expand Down Expand Up @@ -473,7 +471,8 @@ void Catalog::saveToFile(const std::string& directory, VirtualFileSystem* fs,
FileVersionType versionType) const {
KU_ASSERT(!directory.empty());
const auto catalogPath = StorageUtils::getCatalogFilePath(fs, directory, versionType);
const auto catalogFile = fs->openFile(catalogPath, O_WRONLY | O_CREAT);
const auto catalogFile = fs->openFile(catalogPath,
FileFlags::CREATE_IF_NOT_EXISTS | FileFlags::READ_ONLY | FileFlags::WRITE);
Serializer serializer(std::make_unique<BufferedFileWriter>(*catalogFile));
writeMagicBytes(serializer);
serializer.serializeValue(StorageVersionInfo::getStorageVersion());
Expand All @@ -487,8 +486,8 @@ void Catalog::readFromFile(const std::string& directory, VirtualFileSystem* fs,
FileVersionType versionType, main::ClientContext* context) {
KU_ASSERT(!directory.empty());
const auto catalogPath = StorageUtils::getCatalogFilePath(fs, directory, versionType);
Deserializer deserializer(
std::make_unique<BufferedFileReader>(fs->openFile(catalogPath, O_RDONLY, context)));
Deserializer deserializer(std::make_unique<BufferedFileReader>(
fs->openFile(catalogPath, FileFlags::READ_ONLY, context)));
validateMagicBytes(deserializer);
storage_version_t savedStorageVersion;
deserializer.deserializeValue(savedStorageVersion);
Expand Down
60 changes: 48 additions & 12 deletions src/common/file_system/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,63 @@ LocalFileInfo::~LocalFileInfo() {
#endif
}

static void validateFileFlags(uint8_t flags) {
const bool isRead = flags & FileFlags::READ_ONLY;
const bool isWrite = flags & FileFlags::WRITE;
KU_UNUSED(isRead);
KU_UNUSED(isWrite);
// Require either READ or WRITE (or both).
KU_ASSERT(isRead || isWrite);
// CREATE flags require writing.
KU_ASSERT(isWrite || !(flags & FileFlags::CREATE_IF_NOT_EXISTS));
KU_ASSERT(isWrite || !(flags & FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS));
// CREATE_IF_NOT_EXISTS and CREATE_AND_TRUNCATE_IF_EXISTS flags cannot be combined.
KU_ASSERT(!(flags & FileFlags::CREATE_IF_NOT_EXISTS &&
flags & FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS));
}

std::unique_ptr<FileInfo> LocalFileSystem::openFile(const std::string& path, int flags,
main::ClientContext* context, FileLockType lock_type) {
auto fullPath = expandPath(context, path);
#if defined(_WIN32)
auto dwDesiredAccess = 0ul;
int dwCreationDisposition;
if (flags & FileFlags::CREATE_IF_NOT_EXISTS) {
dwCreationDisposition = OPEN_ALWAYS;
} else if (flags & FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS) {
dwCreationDisposition = CREATE_ALWAYS;
validateFileFlags(flags);

int openFlags = 0;
bool readMode = flags & FileFlags::READ_ONLY;
bool writeMode = flags & FileFlags::WRITE;
if (readMode && writeMode) {
openFlags = O_RDWR;
} else if (readMode) {
openFlags = O_RDONLY;
} else if (writeMode) {
openFlags = O_WRONLY;
} else {
dwCreationDisposition = OPEN_EXISTING;
// LCOV_EXCL_START
throw InternalException("READ, WRITE or both should be specified when opening a file.");
// LCOV_EXCL_STOP
}
if (writeMode) {
KU_ASSERT(flags & FileFlags::WRITE);
if (flags & FileFlags::CREATE_IF_NOT_EXISTS) {
openFlags |= O_CREAT;
} else if (flags & FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS) {
openFlags |= O_CREAT | O_TRUNC;
}
}

#if defined(_WIN32)
auto dwDesiredAccess = 0ul;
auto dwCreationDisposition = (openFlags & O_CREAT) ? OPEN_ALWAYS : OPEN_EXISTING;
auto dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE;
if (flags & (O_CREAT | O_WRONLY | O_RDWR)) {
if (openFlags & (O_CREAT | O_WRONLY | O_RDWR)) {
dwDesiredAccess |= GENERIC_WRITE;
}
// O_RDONLY is 0 in practice, so flags & (O_RDONLY | O_RDWR) doesn't work.
if (!(flags & O_WRONLY)) {
// O_RDONLY is 0 in practice, so openFlags & (O_RDONLY | O_RDWR) doesn't work.
if (!(openFlags & O_WRONLY)) {
dwDesiredAccess |= GENERIC_READ;
}
if (openFlags & FileFlags::BINARY) {
dwDesiredAccess |= _O_BINARY;
}

HANDLE handle = CreateFileA(fullPath.c_str(), dwDesiredAccess, dwShareMode, nullptr,
dwCreationDisposition, FILE_ATTRIBUTE_NORMAL, nullptr);
Expand All @@ -78,7 +114,7 @@ std::unique_ptr<FileInfo> LocalFileSystem::openFile(const std::string& path, int
}
return std::make_unique<LocalFileInfo>(fullPath, handle, this);
#else
int fd = open(fullPath.c_str(), flags, 0644);
int fd = open(fullPath.c_str(), openFlags, 0644);
if (fd == -1) {
throw IOException(stringFormat("Cannot open file {}: {}", fullPath, posixErrMessage()));
}
Expand Down
6 changes: 2 additions & 4 deletions src/function/export/export_csv_function.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#include <fcntl.h>

#include "common/file_system/virtual_file_system.h"
#include "common/serializer/buffered_serializer.h"
#include "function/cast/vector_cast_functions.h"
Expand Down Expand Up @@ -110,8 +108,8 @@ struct ExportCSVSharedState : public ExportFuncSharedState {
ExportCSVSharedState() = default;

void init(main::ClientContext& context, const ExportFuncBindData& bindData) override {
fileInfo = context.getVFSUnsafe()->openFile(bindData.fileName, O_WRONLY | O_CREAT | O_TRUNC,
&context);
fileInfo = context.getVFSUnsafe()->openFile(bindData.fileName,
FileFlags::WRITE | FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS, &context);
writeHeader(bindData);
}

Expand Down
16 changes: 16 additions & 0 deletions src/include/common/file_system/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@ namespace common {

enum class FileLockType : uint8_t { NO_LOCK = 0, READ_LOCK = 1, WRITE_LOCK = 2 };

struct FileFlags {
static constexpr uint8_t READ_ONLY = 1 << 0;
static constexpr uint8_t WRITE = 1 << 1;
// Create file if not exists, can only be used together with WRITE
static constexpr uint8_t CREATE_IF_NOT_EXISTS = 1 << 3;
// Always create a new file. If a file exists, the file is truncated. Cannot be used together
// with CREATE_IF_NOT_EXISTS.
static constexpr uint8_t CREATE_AND_TRUNCATE_IF_EXISTS = 1 << 4;
// Temporary file that is not persisted to disk.
static constexpr uint8_t TEMPORARY = 1 << 5;
#ifdef _WIN32
// Only used in windows to open files in binary mode.
static constexpr uint8_t BINARY = 1 << 5;
#endif
};

class KUZU_API FileSystem {
friend struct FileInfo;

Expand Down
6 changes: 2 additions & 4 deletions src/include/storage/storage_structure/overflow_file.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#pragma once

#include <fcntl.h>

#include <cstdint>
#include <functional>
#include <memory>
Expand Down Expand Up @@ -59,7 +57,7 @@ class OverflowFileHandle {
const std::function<void(uint8_t*)>& func) const;

private:
static const common::page_idx_t END_OF_PAGE =
static constexpr common::page_idx_t END_OF_PAGE =
common::BufferPoolConstants::PAGE_4KB_SIZE - sizeof(common::page_idx_t);
// This is the index of the last free byte to which we can write.
PageCursor& nextPosToWriteTo;
Expand Down Expand Up @@ -133,7 +131,7 @@ class OverflowFile {
void writePageToDisk(common::page_idx_t pageIdx, uint8_t* data) const;

protected:
static const uint64_t HEADER_PAGE_IDX = 0;
static constexpr uint64_t HEADER_PAGE_IDX = 0;

std::vector<std::unique_ptr<OverflowFileHandle>> handles;
StringOverflowFileHeader header;
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/wal_replayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ class WALReplayer {
public:
explicit WALReplayer(main::ClientContext& clientContext);

void replay();
void replay() const;

private:
void replayWALRecord(const WALRecord& walRecord);
void replayWALRecord(const WALRecord& walRecord) const;
void replayCreateTableEntryRecord(const WALRecord& walRecord) const;
void replayCreateCatalogEntryRecord(const WALRecord& walRecord) const;
void replayDropCatalogEntryRecord(const WALRecord& walRecord) const;
Expand Down
5 changes: 3 additions & 2 deletions src/main/attached_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ void AttachedKuzuDatabase::initCatalog(const std::string& path, ClientContext* c
}

static void validateEmptyWAL(const std::string& path, ClientContext* context) {
auto walFile = context->getVFSUnsafe()->openFile(
path + "/" + common::StorageConstants::WAL_FILE_SUFFIX, O_RDONLY, context);
auto walFile =
context->getVFSUnsafe()->openFile(path + "/" + common::StorageConstants::WAL_FILE_SUFFIX,
common::FileFlags::READ_ONLY, context);
if (walFile->getFileSize() > 0) {
throw common::RuntimeException(common::stringFormat(
"Cannot attach an external Kùzu database with non-empty wal file. Try manually "
Expand Down
4 changes: 2 additions & 2 deletions src/main/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ SystemConfig::SystemConfig(uint64_t bufferPoolSize_, uint64_t maxNumThreads, boo
}

static void getLockFileFlagsAndType(bool readOnly, bool createNew, int& flags, FileLockType& lock) {
flags = readOnly ? O_RDONLY : O_RDWR;
flags = readOnly ? FileFlags::READ_ONLY : FileFlags::WRITE;
if (createNew && !readOnly) {
flags |= O_CREAT;
flags |= FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS;
}
lock = readOnly ? FileLockType::READ_LOCK : FileLockType::WRITE_LOCK;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#include "processor/operator/persistent/reader/csv/base_csv_reader.h"

#include <fcntl.h>

#include <vector>

#include "common/exception/copy.h"
Expand All @@ -20,9 +18,9 @@ BaseCSVReader::BaseCSVReader(const std::string& filePath, common::CSVOption opti
: option{std::move(option)}, numColumns{numColumns}, buffer{nullptr}, bufferSize{0},
position{0}, osFileOffset{0}, rowEmpty{false}, context{context} {
fileInfo = context->getVFSUnsafe()->openFile(filePath,
O_RDONLY
FileFlags::READ_ONLY
#ifdef _WIN32
| _O_BINARY
| FileFlags::BINARY
#endif
,
context);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#include "processor/operator/persistent/reader/parquet/parquet_reader.h"

#include <fcntl.h>

#include "common/exception/binder.h"
#include "common/exception/copy.h"
#include "common/file_system/virtual_file_system.h"
Expand Down Expand Up @@ -33,7 +31,7 @@ void ParquetReader::initializeScan(ParquetReaderScanState& state,
state.groupIdxList = std::move(groups_to_read);
if (!state.fileInfo || state.fileInfo->path != filePath) {
state.prefetchMode = false;
state.fileInfo = vfs->openFile(filePath, O_RDONLY, context);
state.fileInfo = vfs->openFile(filePath, FileFlags::READ_ONLY, context);
}

state.thriftFileProto = createThriftProtocol(state.fileInfo.get(), state.prefetchMode);
Expand Down Expand Up @@ -170,7 +168,7 @@ void ParquetReader::scan(processor::ParquetReaderScanState& state, DataChunk& re
}

void ParquetReader::initMetadata() {
auto fileInfo = context->getVFSUnsafe()->openFile(filePath, O_RDONLY, context);
auto fileInfo = context->getVFSUnsafe()->openFile(filePath, FileFlags::READ_ONLY, context);
auto proto = createThriftProtocol(fileInfo.get(), false);
auto& transport =
ku_dynamic_cast<kuzu_apache::thrift::transport::TTransport&, ThriftFileTransport&>(
Expand Down
Loading

0 comments on commit 7d39282

Please sign in to comment.