Skip to content

Commit

Permalink
add sync to fs
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Apr 15, 2024
1 parent bbaeb60 commit 0ccc595
Show file tree
Hide file tree
Showing 18 changed files with 221 additions and 216 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ add_subdirectory(third_party)
if(${BUILD_KUZU})
add_definitions(-DKUZU_ROOT_DIRECTORY="${PROJECT_SOURCE_DIR}")
add_definitions(-DKUZU_CMAKE_VERSION="${CMAKE_PROJECT_VERSION}")
add_definitions(-DKUZU_EXTENSION_VERSION="0.2.5")
add_definitions(-DKUZU_EXTENSION_VERSION="0.2.6")

include_directories(src/include)

Expand Down
87 changes: 46 additions & 41 deletions extension/httpfs/src/httpfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "common/cast.h"
#include "common/exception/io.h"
#include "common/exception/not_implemented.h"

namespace kuzu {
namespace httpfs {
Expand Down Expand Up @@ -126,81 +127,85 @@ bool HTTPFileSystem::canHandleFile(const std::string& path) const {
return path.rfind("https://", 0) == 0 || path.rfind("http://", 0) == 0;
}

void HTTPFileSystem::readFromFile(common::FileInfo* fileInfo, void* buffer, uint64_t numBytes,
void HTTPFileSystem::readFromFile(common::FileInfo& fileInfo, void* buffer, uint64_t numBytes,
uint64_t position) const {
auto httpFileInfo = ku_dynamic_cast<FileInfo*, HTTPFileInfo*>(fileInfo);
auto& httpFileInfo = ku_dynamic_cast<FileInfo&, HTTPFileInfo&>(fileInfo);
auto numBytesToRead = numBytes;
auto bufferOffset = 0;
if (position >= httpFileInfo->bufferStartPos && position < httpFileInfo->bufferEndPos) {
httpFileInfo->fileOffset = position;
httpFileInfo->bufferIdx = position - httpFileInfo->bufferStartPos;
httpFileInfo->availableBuffer =
(httpFileInfo->bufferEndPos - httpFileInfo->bufferStartPos) - httpFileInfo->bufferIdx;
if (position >= httpFileInfo.bufferStartPos && position < httpFileInfo.bufferEndPos) {
httpFileInfo.fileOffset = position;
httpFileInfo.bufferIdx = position - httpFileInfo.bufferStartPos;
httpFileInfo.availableBuffer =
(httpFileInfo.bufferEndPos - httpFileInfo.bufferStartPos) - httpFileInfo.bufferIdx;
} else {
httpFileInfo->availableBuffer = 0;
httpFileInfo->bufferIdx = 0;
httpFileInfo->fileOffset = position;
httpFileInfo.availableBuffer = 0;
httpFileInfo.bufferIdx = 0;
httpFileInfo.fileOffset = position;
}
while (numBytesToRead > 0) {
auto buffer_read_len = std::min<uint64_t>(httpFileInfo->availableBuffer, numBytesToRead);
auto buffer_read_len = std::min<uint64_t>(httpFileInfo.availableBuffer, numBytesToRead);
if (buffer_read_len > 0) {
KU_ASSERT(httpFileInfo->bufferStartPos + httpFileInfo->bufferIdx + buffer_read_len <=
httpFileInfo->bufferEndPos);
KU_ASSERT(httpFileInfo.bufferStartPos + httpFileInfo.bufferIdx + buffer_read_len <=
httpFileInfo.bufferEndPos);
memcpy((char*)buffer + bufferOffset,
httpFileInfo->readBuffer.get() + httpFileInfo->bufferIdx, buffer_read_len);
httpFileInfo.readBuffer.get() + httpFileInfo.bufferIdx, buffer_read_len);

bufferOffset += buffer_read_len;
numBytesToRead -= buffer_read_len;

httpFileInfo->bufferIdx += buffer_read_len;
httpFileInfo->availableBuffer -= buffer_read_len;
httpFileInfo->fileOffset += buffer_read_len;
httpFileInfo.bufferIdx += buffer_read_len;
httpFileInfo.availableBuffer -= buffer_read_len;
httpFileInfo.fileOffset += buffer_read_len;
}

if (numBytesToRead > 0 && httpFileInfo->availableBuffer == 0) {
auto newBufferAvailableSize = std::min<uint64_t>(httpFileInfo->READ_BUFFER_LEN,
httpFileInfo->length - httpFileInfo->fileOffset);
if (numBytesToRead > 0 && httpFileInfo.availableBuffer == 0) {
auto newBufferAvailableSize = std::min<uint64_t>(httpFileInfo.READ_BUFFER_LEN,
httpFileInfo.length - httpFileInfo.fileOffset);

// Bypass buffer if we read more than buffer size.
if (numBytesToRead > newBufferAvailableSize) {
getRangeRequest(httpFileInfo, httpFileInfo->path, {}, position + bufferOffset,
getRangeRequest(&httpFileInfo, httpFileInfo.path, {}, position + bufferOffset,
(char*)buffer + bufferOffset, numBytesToRead);
httpFileInfo->availableBuffer = 0;
httpFileInfo->bufferIdx = 0;
httpFileInfo->fileOffset += numBytesToRead;
httpFileInfo.availableBuffer = 0;
httpFileInfo.bufferIdx = 0;
httpFileInfo.fileOffset += numBytesToRead;
break;
} else {
getRangeRequest(httpFileInfo, httpFileInfo->path, {}, httpFileInfo->fileOffset,
(char*)httpFileInfo->readBuffer.get(), newBufferAvailableSize);
httpFileInfo->availableBuffer = newBufferAvailableSize;
httpFileInfo->bufferIdx = 0;
httpFileInfo->bufferStartPos = httpFileInfo->fileOffset;
httpFileInfo->bufferEndPos = httpFileInfo->bufferStartPos + newBufferAvailableSize;
getRangeRequest(&httpFileInfo, httpFileInfo.path, {}, httpFileInfo.fileOffset,
(char*)httpFileInfo.readBuffer.get(), newBufferAvailableSize);
httpFileInfo.availableBuffer = newBufferAvailableSize;
httpFileInfo.bufferIdx = 0;
httpFileInfo.bufferStartPos = httpFileInfo.fileOffset;
httpFileInfo.bufferEndPos = httpFileInfo.bufferStartPos + newBufferAvailableSize;
}
}
}
}

int64_t HTTPFileSystem::readFile(common::FileInfo* fileInfo, void* buf, size_t numBytes) const {
auto httpFileInfo = ku_dynamic_cast<FileInfo*, HTTPFileInfo*>(fileInfo);
auto maxNumBytesToRead = httpFileInfo->length - httpFileInfo->fileOffset;
int64_t HTTPFileSystem::readFile(common::FileInfo& fileInfo, void* buf, size_t numBytes) const {
auto& httpFileInfo = ku_dynamic_cast<FileInfo&, HTTPFileInfo&>(fileInfo);
auto maxNumBytesToRead = httpFileInfo.length - httpFileInfo.fileOffset;
numBytes = std::min<uint64_t>(maxNumBytesToRead, numBytes);
if (httpFileInfo->fileOffset > httpFileInfo->getFileSize()) {
if (httpFileInfo.fileOffset > httpFileInfo.getFileSize()) {
return 0;
}
readFromFile(fileInfo, buf, numBytes, httpFileInfo->fileOffset);
readFromFile(fileInfo, buf, numBytes, httpFileInfo.fileOffset);
return numBytes;
}

int64_t HTTPFileSystem::seek(common::FileInfo* fileInfo, uint64_t offset, int /*whence*/) const {
auto httpFileInfo = ku_dynamic_cast<FileInfo*, HTTPFileInfo*>(fileInfo);
httpFileInfo->fileOffset = offset;
void HTTPFileSystem::syncFile(const common::FileInfo&) const {
throw NotImplementedException("syncFile is not supported in HTTPFileSystem");
}

int64_t HTTPFileSystem::seek(common::FileInfo& fileInfo, uint64_t offset, int /*whence*/) const {
auto& httpFileInfo = ku_dynamic_cast<FileInfo&, HTTPFileInfo&>(fileInfo);
httpFileInfo.fileOffset = offset;
return offset;
}

uint64_t HTTPFileSystem::getFileSize(common::FileInfo* fileInfo) const {
auto httpFileInfo = ku_dynamic_cast<FileInfo*, HTTPFileInfo*>(fileInfo);
return httpFileInfo->length;
uint64_t HTTPFileSystem::getFileSize(const common::FileInfo& fileInfo) const {
auto& httpFileInfo = ku_dynamic_cast<const FileInfo&, const HTTPFileInfo&>(fileInfo);
return httpFileInfo.length;
}

std::unique_ptr<httplib::Client> HTTPFileSystem::getClient(const std::string& host) {
Expand Down
10 changes: 6 additions & 4 deletions extension/httpfs/src/include/httpfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,17 @@ class HTTPFileSystem : public common::FileSystem {

static std::unique_ptr<httplib::Headers> getHTTPHeaders(HeaderMap& headerMap);

void syncFile(const common::FileInfo& fileInfo) const override;

protected:
void readFromFile(common::FileInfo* fileInfo, void* buffer, uint64_t numBytes,
void readFromFile(common::FileInfo& fileInfo, void* buffer, uint64_t numBytes,
uint64_t position) const override;

int64_t readFile(common::FileInfo* fileInfo, void* buf, size_t numBytes) const override;
int64_t readFile(common::FileInfo& fileInfo, void* buf, size_t numBytes) const override;

int64_t seek(common::FileInfo* fileInfo, uint64_t offset, int whence) const override;
int64_t seek(common::FileInfo& fileInfo, uint64_t offset, int whence) const override;

uint64_t getFileSize(common::FileInfo* fileInfo) const override;
uint64_t getFileSize(const common::FileInfo& fileInfo) const override;

static std::pair<std::string, std::string> parseUrl(const std::string& url);

Expand Down
2 changes: 1 addition & 1 deletion extension/httpfs/src/include/s3fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class S3FileSystem final : public HTTPFileSystem {

std::string initializeMultiPartUpload(S3FileInfo* fileInfo) const;

void writeFile(common::FileInfo* fileInfo, const uint8_t* buffer, uint64_t numBytes,
void writeFile(common::FileInfo& fileInfo, const uint8_t* buffer, uint64_t numBytes,
uint64_t offset) const override;

std::shared_ptr<S3WriteBuffer> allocateWriteBuffer(uint16_t writeBufferIdx, uint64_t partSize,
Expand Down
20 changes: 10 additions & 10 deletions extension/httpfs/src/s3fs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,10 +456,10 @@ std::string S3FileSystem::initializeMultiPartUpload(S3FileInfo* fileInfo) const
return getUploadID(result);
}

void S3FileSystem::writeFile(common::FileInfo* fileInfo, const uint8_t* buffer, uint64_t numBytes,
void S3FileSystem::writeFile(common::FileInfo& fileInfo, const uint8_t* buffer, uint64_t numBytes,
uint64_t offset) const {
auto s3FileInfo = ku_dynamic_cast<FileInfo*, S3FileInfo*>(fileInfo);
if (!((s3FileInfo->flags & O_ACCMODE) & O_WRONLY)) {
auto& s3FileInfo = ku_dynamic_cast<FileInfo&, S3FileInfo&>(fileInfo);
if (!((s3FileInfo.flags & O_ACCMODE) & O_WRONLY)) {
throw IOException("Write called on a file which is not open in write mode.");
}
uint64_t numBytesWritten = 0;
Expand All @@ -468,20 +468,20 @@ void S3FileSystem::writeFile(common::FileInfo* fileInfo, const uint8_t* buffer,
// We use amazon multipart upload API which segments an object into a set of parts. Since we
// don't track the usage of individual part, determining whether we can upload a part is
// challenging if we allow non-sequential write.
if (currOffset != s3FileInfo->fileOffset) {
if (currOffset != s3FileInfo.fileOffset) {
throw InternalException("Non-sequential write not supported.");
}
auto writeBufferIdx = currOffset / s3FileInfo->partSize;
auto writeBuffer = s3FileInfo->getBuffer(writeBufferIdx);
auto writeBufferIdx = currOffset / s3FileInfo.partSize;
auto writeBuffer = s3FileInfo.getBuffer(writeBufferIdx);
auto offsetToWrite = currOffset - writeBuffer->startOffset;
auto numBytesToWrite =
std::min<uint64_t>(numBytes - numBytesWritten, s3FileInfo->partSize - offsetToWrite);
std::min<uint64_t>(numBytes - numBytesWritten, s3FileInfo.partSize - offsetToWrite);
memcpy(writeBuffer->getData() + offsetToWrite, buffer + numBytesWritten, numBytesToWrite);
writeBuffer->numBytesWritten += numBytesToWrite;
if (writeBuffer->numBytesWritten >= s3FileInfo->partSize) {
flushBuffer(s3FileInfo, writeBuffer);
if (writeBuffer->numBytesWritten >= s3FileInfo.partSize) {
flushBuffer(&s3FileInfo, writeBuffer);
}
s3FileInfo->fileOffset += numBytesToWrite;
s3FileInfo.fileOffset += numBytesToWrite;
numBytesWritten += numBytesToWrite;
}
}
Expand Down
18 changes: 11 additions & 7 deletions src/common/file_system/file_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,32 @@
namespace kuzu {
namespace common {

uint64_t FileInfo::getFileSize() {
return fileSystem->getFileSize(this);
uint64_t FileInfo::getFileSize() const {
return fileSystem->getFileSize(*this);
}

void FileInfo::readFromFile(void* buffer, uint64_t numBytes, uint64_t position) {
fileSystem->readFromFile(this, buffer, numBytes, position);
fileSystem->readFromFile(*this, buffer, numBytes, position);
}

int64_t FileInfo::readFile(void* buf, size_t nbyte) {
return fileSystem->readFile(this, buf, nbyte);
return fileSystem->readFile(*this, buf, nbyte);
}

void FileInfo::writeFile(const uint8_t* buffer, uint64_t numBytes, uint64_t offset) {
fileSystem->writeFile(this, buffer, numBytes, offset);
fileSystem->writeFile(*this, buffer, numBytes, offset);
}

void FileInfo::syncFile() const {
fileSystem->syncFile(*this);
}

int64_t FileInfo::seek(uint64_t offset, int whence) {
return fileSystem->seek(this, offset, whence);
return fileSystem->seek(*this, offset, whence);
}

void FileInfo::truncate(uint64_t size) {
fileSystem->truncate(this, size);
fileSystem->truncate(*this, size);
}

} // namespace common
Expand Down
4 changes: 2 additions & 2 deletions src/common/file_system/file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ std::string FileSystem::getFileExtension(const std::filesystem::path& path) {
return path.extension().string();
}

void FileSystem::writeFile(FileInfo* /*fileInfo*/, const uint8_t* /*buffer*/, uint64_t /*numBytes*/,
void FileSystem::writeFile(FileInfo& /*fileInfo*/, const uint8_t* /*buffer*/, uint64_t /*numBytes*/,
uint64_t /*offset*/) const {
KU_UNREACHABLE;
}

void FileSystem::truncate(FileInfo* /*fileInfo*/, uint64_t /*size*/) const {
void FileSystem::truncate(FileInfo& /*fileInfo*/, uint64_t /*size*/) const {
KU_UNREACHABLE;
}

Expand Down
Loading

0 comments on commit 0ccc595

Please sign in to comment.