Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Aug 28, 2024
1 parent 27c5ef5 commit d83daf3
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 10 deletions.
1 change: 1 addition & 0 deletions extension/httpfs/src/cached_file_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ std::string CachedFileManager::getCachedDirForTrx(common::transaction_t transact
}

void CachedFileManager::downloadFile(HTTPFileInfo* fileToDownload, FileInfo* cacheFileInfo) {
fileToDownload->initMetadata();
uint64_t numBytesRead;
uint64_t offsetToWrite = 0;
do {
Expand Down
30 changes: 23 additions & 7 deletions extension/httpfs/src/httpfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ HTTPFileInfo::HTTPFileInfo(std::string path, FileSystem* fileSystem, int flags,
bufferIdx{0}, fileOffset{0}, bufferStartPos{0}, bufferEndPos{0}, httpConfig{context},
cachedFileInfo{nullptr} {}

void HTTPFileInfo::initialize(main::ClientContext* context) {
initializeClient();
void HTTPFileInfo::initMetadata() {
auto hfs = fileSystem->ptrCast<HTTPFileSystem>();
initializeClient();
auto res = hfs->headRequest(this->ptrCast<HTTPFileInfo>(), path, {});
std::string rangeLength;
if (res->code != 200) {
Expand Down Expand Up @@ -105,10 +105,16 @@ void HTTPFileInfo::initialize(main::ClientContext* context) {
// LCOV_EXCL_STOP
}
}
}

void HTTPFileInfo::initialize(main::ClientContext* context) {
if (httpConfig.cacheFile) {
auto hfs = fileSystem->ptrCast<HTTPFileSystem>();
cachedFileInfo =
hfs->getCachedFileManager().getCachedFileInfo(this, context->getTx()->getID());
return;
}
initMetadata();
}

void HTTPFileInfo::initializeClient() {
Expand Down Expand Up @@ -155,7 +161,7 @@ void HTTPFileSystem::cleanUP(main::ClientContext* context) {

void HTTPFileSystem::readFromFile(common::FileInfo& fileInfo, void* buffer, uint64_t numBytes,
uint64_t position) const {
auto& httpFileInfo = ku_dynamic_cast<FileInfo&, HTTPFileInfo&>(fileInfo);
auto& httpFileInfo = fileInfo.cast<HTTPFileInfo>();
auto numBytesToRead = numBytes;
auto bufferOffset = 0;
if (httpFileInfo.cachedFileInfo != nullptr) {
Expand Down Expand Up @@ -214,7 +220,10 @@ void HTTPFileSystem::readFromFile(common::FileInfo& fileInfo, void* buffer, uint
}

int64_t HTTPFileSystem::readFile(common::FileInfo& fileInfo, void* buf, size_t numBytes) const {
auto& httpFileInfo = ku_dynamic_cast<FileInfo&, HTTPFileInfo&>(fileInfo);
auto& httpFileInfo = fileInfo.constCast<HTTPFileInfo>();
if (httpFileInfo.cachedFileInfo != nullptr) {
return httpFileInfo.cachedFileInfo->readFile(buf, numBytes);
}
auto maxNumBytesToRead = httpFileInfo.length - httpFileInfo.fileOffset;
numBytes = std::min<uint64_t>(maxNumBytesToRead, numBytes);
if (httpFileInfo.fileOffset > httpFileInfo.getFileSize()) {
Expand All @@ -228,14 +237,21 @@ void HTTPFileSystem::syncFile(const common::FileInfo&) const {
throw NotImplementedException("syncFile is not supported in HTTPFileSystem");
}

int64_t HTTPFileSystem::seek(common::FileInfo& fileInfo, uint64_t offset, int /*whence*/) const {
auto& httpFileInfo = ku_dynamic_cast<FileInfo&, HTTPFileInfo&>(fileInfo);
int64_t HTTPFileSystem::seek(common::FileInfo& fileInfo, uint64_t offset, int whence) const {
auto& httpFileInfo = fileInfo.cast<HTTPFileInfo>();
if (httpFileInfo.cachedFileInfo != nullptr) {
httpFileInfo.cachedFileInfo->seek(offset, whence);
return offset;
}
httpFileInfo.fileOffset = offset;
return offset;
}

uint64_t HTTPFileSystem::getFileSize(const common::FileInfo& fileInfo) const {
auto& httpFileInfo = ku_dynamic_cast<const FileInfo&, const HTTPFileInfo&>(fileInfo);
auto& httpFileInfo = fileInfo.constCast<HTTPFileInfo>();
if (httpFileInfo.cachedFileInfo != nullptr) {
return httpFileInfo.cachedFileInfo->getFileSize();
}
return httpFileInfo.length;
}

Expand Down
2 changes: 2 additions & 0 deletions extension/httpfs/src/include/httpfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ struct HTTPFileInfo : public common::FileInfo {

virtual void initializeClient();

void initMetadata();

// We keep a http client stored for connection reuse with keep-alive headers.
std::unique_ptr<httplib::Client> httpClient;

Expand Down
7 changes: 4 additions & 3 deletions extension/httpfs/src/s3fs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ S3FileInfo::S3FileInfo(std::string path, common::FileSystem* fileSystem, int fla

S3FileInfo::~S3FileInfo() {
auto s3FS = fileSystem->ptrCast<S3FileSystem>();
if ((((flags & O_ACCMODE) & O_WRONLY)) && !uploadFinalized) {
if ((flags & FileFlags::WRITE) && !uploadFinalized) {
s3FS->flushAllBuffers(this);
if (numPartsUploaded) {
s3FS->finalizeMultipartUpload(this);
Expand All @@ -63,7 +63,7 @@ S3FileInfo::~S3FileInfo() {
void S3FileInfo::initialize(main::ClientContext* context) {
HTTPFileInfo::initialize(context);
auto s3FS = fileSystem->constPtrCast<S3FileSystem>();
if ((flags & O_ACCMODE) & O_WRONLY) {
if (flags & FileFlags::WRITE) {
auto maxNumParts = uploadParams.maxNumPartsPerFile;
auto requiredPartSize = uploadParams.maxFileSize / maxNumParts;
partSize = std::max<uint64_t>(AWS_MINIMUM_PART_SIZE, requiredPartSize);
Expand Down Expand Up @@ -454,6 +454,7 @@ ParsedS3URL S3FileSystem::parseS3URL(std::string url, S3AuthParams& params) {

std::string S3FileSystem::initializeMultiPartUpload(S3FileInfo* fileInfo) const {
// AWS response is around 300~ chars in docs so this should be enough to not need a resize.
fileInfo->initMetadata();
auto responseBufferLen = DEFAULT_RESPONSE_BUFFER_SIZE;
auto responseBuffer = std::make_unique<uint8_t[]>(responseBufferLen);
std::string queryParam = "uploads=";
Expand All @@ -466,7 +467,7 @@ std::string S3FileSystem::initializeMultiPartUpload(S3FileInfo* fileInfo) const
void S3FileSystem::writeFile(common::FileInfo& fileInfo, const uint8_t* buffer, uint64_t numBytes,
uint64_t offset) const {
auto s3FileInfo = fileInfo.ptrCast<S3FileInfo>();
if (!((s3FileInfo->flags & O_ACCMODE) & O_WRONLY)) {
if (!(s3FileInfo->flags & FileFlags::WRITE)) {
throw IOException("Write called on a file which is not open in write mode.");
}
uint64_t numBytesWritten = 0;
Expand Down
10 changes: 10 additions & 0 deletions src/include/common/file_system/file_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ struct KUZU_API FileInfo {
return common::ku_dynamic_cast<const FileInfo*, const TARGET*>(this);
}

template<class TARGET>
const TARGET& constCast() const {
return common::ku_dynamic_cast<const FileInfo&, const TARGET&>(*this);
}

template<class TARGET>
TARGET& cast() {
return common::ku_dynamic_cast<FileInfo&, TARGET&>(*this);
}

const std::string path;

FileSystem* fileSystem;
Expand Down

0 comments on commit d83daf3

Please sign in to comment.