Skip to content

Commit

Permalink
Merge pull request #346 from BigVan/main
Browse files Browse the repository at this point in the history
[feat.] a new tool 'overlaybd-merge'
  • Loading branch information
liulanzheng authored Aug 21, 2024
2 parents 1059cf4 + 99b3160 commit e13a27d
Show file tree
Hide file tree
Showing 11 changed files with 293 additions and 7 deletions.
5 changes: 5 additions & 0 deletions src/image_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <photon/fs/aligned-file.h>
#include <photon/fs/localfs.h>
#include "overlaybd/lsmt/file.h"
#include "overlaybd/lsmt/index.h"
#include "overlaybd/zfile/zfile.h"
#include "config.h"
#include "image_file.h"
Expand Down Expand Up @@ -515,6 +516,10 @@ int ImageFile::init_image_file() {
return -1;
}

int ImageFile::compact(IFile *as) {
return ((LSMT::IFileRO*)m_file)->flatten(as);
}

void ImageFile::set_auth_failed() {
if (m_status == 0) // only set exit in image boot phase
{
Expand Down
4 changes: 4 additions & 0 deletions src/image_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class ImageFile : public photon::fs::ForwardFile {
return m_file;
}

int compact(IFile *as);

private:
Prefetcher *m_prefetcher = nullptr;
ImageConfigNS::ImageConfig conf;
Expand All @@ -126,5 +128,7 @@ class ImageFile : public photon::fs::ForwardFile {
IFile *__open_ro_target_file(const std::string &);
IFile *__open_ro_remote(const std::string &dir, const std::string &, const uint64_t, int);
IFile *__open_ro_target_remote(const std::string &dir, const std::string &, const uint64_t, int);

// size_t seek_data(off_t begin, off_t end);
void start_bk_dl_thread();
};
1 change: 1 addition & 0 deletions src/image_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class ImageService {
// bool enable_acceleration(GlobalFs *global_fs, ImageConfigNS::P2PConfig conf);
bool enable_acceleration();


ImageConfigNS::GlobalConfig global_conf;
struct GlobalFs global_fs;
std::unique_ptr<OverlayBDMetric> metrics;
Expand Down
39 changes: 38 additions & 1 deletion src/overlaybd/lsmt/file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ struct HeaderTrailer {
} __attribute__((packed));

class LSMTReadOnlyFile;
static int merge_files_ro(vector<IFile *> files, const CommitArgs &args);
static LSMTReadOnlyFile *open_file_ro(IFile *file, bool ownership, bool reserve_tag);
static HeaderTrailer *verify_ht(IFile *file, char *buf, bool is_trailer = false,
ssize_t st_size = -1);
Expand Down Expand Up @@ -642,6 +643,33 @@ class LSMTReadOnlyFile : public IFileRW {
ret.valid_data_size = size;
return ret;
}

virtual ssize_t seek_data(off_t begin, off_t end, vector<Segment> &segs) override {

begin /= ALIGNMENT;
end /= ALIGNMENT;
while (begin < end) {
SegmentMapping mappings[128];
auto length = (end - begin < Segment::MAX_LENGTH ? end - begin : Segment::MAX_LENGTH);
Segment s{(uint64_t)begin, (uint32_t)length};
auto find = m_index->lookup(s, mappings, 128);
if (find == 0) {
begin+=length;
continue;
}
segs.insert(segs.end(), mappings, mappings + find);
begin=mappings[find-1].end();

}
return segs.size();
}

virtual int flatten(IFile *as) override{
CommitArgs args(as);
vector<IFile*> files = m_files;
reverse(files.begin(), files.end());
return merge_files_ro(files, args);
}
};

class LSMTFile : public LSMTReadOnlyFile {
Expand Down Expand Up @@ -956,6 +984,15 @@ class LSMTFile : public LSMTReadOnlyFile {
data_stat.valid_data_size);
return data_stat;
}

virtual int flatten(IFile *as) override {

unique_ptr<IComboIndex> pmi((IComboIndex*)(m_index->make_read_only_index()));
CommitArgs args(as);
atomic_uint64_t _no_use_var(0);
CompactOptions opts(&m_files, (SegmentMapping*)(pmi->buffer()), pmi->size(), m_vsize, &args);
return compact(opts, _no_use_var);
}
};
class LSMTSparseFile : public LSMTFile {
public:
Expand Down Expand Up @@ -1677,7 +1714,7 @@ IFileRO *open_files_ro(IFile **files, size_t n, bool ownership) {
return rst;
}

int merge_files_ro(vector<IFile *> files, const CommitArgs &args) {
static int merge_files_ro(vector<IFile *> files, const CommitArgs &args) {
uint64_t vsize;
vector<UUID> files_uuid(files.size());
auto pmi = unique_ptr<IMemoryIndex>(load_merge_index(files, files_uuid, vsize));
Expand Down
7 changes: 6 additions & 1 deletion src/overlaybd/lsmt/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class IFileRO : public photon::fs::VirtualReadOnlyFile {
virtual int get_uuid(UUID &out, size_t layer_idx = 0) const = 0;

virtual std::vector<IFile *> get_lower_files() const = 0;

virtual ssize_t seek_data(off_t begin, off_t end, std::vector<Segment> &segs) = 0;

virtual int flatten(IFile *as) = 0;

};

struct CommitArgs {
Expand All @@ -70,7 +75,6 @@ struct CommitArgs {
class IFileRW : public IFileRO {
public:
virtual IMemoryIndex0 *index() const override = 0;

const int Index_Group_Commit = 10;

static const int RemoteData = 11;
Expand All @@ -96,6 +100,7 @@ class IFileRW : public IFileRO {
uint64_t valid_data_size = -1; // size of valid data (excluding garbage)
};
virtual DataStat data_stat() const = 0;

};

// create a new writable LSMT file constitued by a data file and an index file,
Expand Down
19 changes: 19 additions & 0 deletions src/overlaybd/lsmt/index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ class Index : public IMemoryIndex {
uint64_t vsize() const override {
return virtual_size;
}

UNIMPLEMENTED_POINTER(IMemoryIndex *make_read_only_index() const override);
};

class LevelIndex : public Index {
Expand Down Expand Up @@ -254,6 +256,8 @@ class Index0 : public IComboIndex {
}
} alloc_blk;

// Index0(const set<SegmentMapping> &mapping) : mapping(mapping){};

Index0(const SegmentMapping *pmappings = nullptr, size_t n = 0) {
if (pmappings == nullptr)
return;
Expand Down Expand Up @@ -505,6 +509,21 @@ class ComboIndex : public Index0 {
merge_indexes(0, mappings, indexes, 2, 0, UINT64_MAX, false, max_level);
return new Index(std::move(mappings));
}

virtual Index *make_read_only_index() const override{
vector<SegmentMapping> mappings;
auto ro_idx0 = new Index;
ro_idx0->ownership = false;
ro_idx0->assign(mapping.begin(), mapping.end());
if (m_backing_index == nullptr) {
return ro_idx0;
}
const Index *indexes[2] = {ro_idx0, const_cast<Index *>(m_backing_index)};
merge_indexes(0, mappings, indexes, 2, 0, UINT64_MAX, false, 2);
delete ro_idx0;
return new Index(std::move(mappings));
}

};

//======================== END OF ComboIndex =============================//
Expand Down
6 changes: 5 additions & 1 deletion src/overlaybd/lsmt/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class IMemoryIndex {
virtual uint64_t block_count() const = 0;

virtual uint64_t vsize() const = 0;

virtual IMemoryIndex *make_read_only_index() const = 0;
};

// the level 0 memory index, which supports write
Expand All @@ -139,7 +141,7 @@ class IMemoryIndex0 : public IMemoryIndex {
// dump the the whole index as an array
// memory allocation is aligned to the `alignment`
virtual SegmentMapping *dump(size_t alignment = 0) const = 0;
virtual IMemoryIndex *make_read_only_index() const = 0;
// virtual IMemoryIndex *make_read_only_index() const = 0;
};

class IComboIndex : public IMemoryIndex0 {
Expand All @@ -153,6 +155,8 @@ class IComboIndex : public IMemoryIndex0 {
// and then clear the original index0.
// virtual IMemoryIndex0* gc_index() = 0;
virtual IMemoryIndex *load_range_index(int, int) const = 0;


};

// create writable level 0 memory index from an array of mappings;
Expand Down
48 changes: 46 additions & 2 deletions src/overlaybd/lsmt/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,26 +584,70 @@ TEST_F(FileTest3, stack_files) {
cout << "merging RO layers as " << fn_merged << endl;
auto merged = lfs->open(fn_merged, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU);
merge_files_ro(files, FLAGS_layers, merged);
/*auto mergedro =*/::open_file_ro(merged, true);
auto mergedro =::open_file_ro(merged, true);
cout << "verifying merged RO layers file" << endl;
verify_file(mergedro);
delete mergedro;
cout << "verifying stacked RO layers file" << endl;
auto lower = open_files_ro(files, FLAGS_layers);
verify_file(lower);
((LSMTReadOnlyFile *)lower)->m_index =
create_level_index(lower->index()->buffer(), lower->index()->size(), 0, UINT64_MAX, false);
EXPECT_EQ(((LSMTReadOnlyFile *)lower)->close_seal(), -1);
CommitArgs _(nullptr);
EXPECT_EQ(((LSMTReadOnlyFile *)lower)->commit(_), -1);
auto stat = ((LSMTReadOnlyFile *)lower)->data_stat();
LOG_INFO("RO valid data: `", stat.valid_data_size);
merged = lfs->open(fn_merged, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU);
EXPECT_EQ(lower->flatten(merged), 0);
cout << "verifying flattened layer of lowers" << endl;
verify_file(fn_merged);
delete merged;
cout << "generating a RW layer by randwrite()" << endl;
auto upper = create_file_rw();
auto file = stack_files(upper, lower, 0, true);
randwrite(file, FLAGS_nwrites);
verify_file(file);
cout << "verifying flattened layer of stacked layers" << endl;

merged = lfs->open(fn_merged, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU);
file->flatten(merged);
verify_file(fn_merged);
delete file;
}

TEST_F(FileTest3, seek_data) {
CleanUp();
cout << "generating " << FLAGS_layers << " RO layers by randwrite()" << endl;
for (int i = 0; i < FLAGS_layers; ++i) {
files[i] = create_commit_layer(0, ut_io_engine);
}

auto lower = open_files_ro(files, FLAGS_layers);
verify_file(lower);
((LSMTReadOnlyFile *)lower)->m_index =
create_level_index(lower->index()->buffer(), lower->index()->size(), 0, UINT64_MAX, false);
cout << "generating a RW layer by randwrite()" << endl;
auto upper = create_file_rw();
auto file = stack_files(upper, lower, 0, true);
randwrite(file, FLAGS_nwrites);
verify_file(file);

auto fmerged = create_file_rw();
vector<Segment> segs;
auto data = new char[8<<20];
file->seek_data(0, vsize, segs);
LOG_INFO("flattern segments count: `", segs.size());
for (auto m : segs) {
auto readn = file->pread(data, m.length*ALIGNMENT, m.offset * ALIGNMENT);
EXPECT_EQ(readn, fmerged->pwrite(data, readn, m.offset * ALIGNMENT));
}
verify_file(fmerged);
delete file;
delete fmerged;
delete[] data;
}


TEST_F(FileTest3, sparsefile_close_seal) {
CleanUp();
cout << "generating " << FLAGS_layers << " RO layers by randwrite()" << endl;
Expand Down
32 changes: 30 additions & 2 deletions src/overlaybd/zfile/zfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "compressor.h"
#include <atomic>
#include <thread>
#include "photon/thread/thread11.h"

using namespace photon::fs;

Expand Down Expand Up @@ -938,6 +939,16 @@ class ZFileBuilderMP : public ZFileBuilderBase {
UNIMPLEMENTED(int fstat(struct stat *buf) override);
};

ssize_t read_chunk(IFile *file, char *buf, off_t offset, size_t length, ssize_t *result) {
LOG_DEBUG("read zfile index chunk {offset: `, len: `}", offset, length);
if (file->pread(buf, length, offset) != (ssize_t)length) {
*result = -1;
LOG_ERRNO_RETURN(0, -1, "failed to read index chunk {offset: `, len: `}.", offset, length);
}
*result = 0;
return 0;
}

bool load_jump_table(IFile *file, CompressionFile::HeaderTrailer *pheader_trailer,
CompressionFile::JumpTable &jump_table, bool trailer = true) {
char buf[CompressionFile::HeaderTrailer::SPACE];
Expand Down Expand Up @@ -994,8 +1005,25 @@ bool load_jump_table(IFile *file, CompressionFile::HeaderTrailer *pheader_traile
}
auto ibuf = std::unique_ptr<uint32_t[]>(new uint32_t[pht->index_size]);
LOG_DEBUG("index_offset: `", pht->index_offset);
ret = file->pread((void *)(ibuf.get()), index_bytes, pht->index_offset);
if (ret < (ssize_t)index_bytes) {

size_t delta = 1UL<<20;
std::vector<photon::join_handle*> ths;
ssize_t *r = new ssize_t[index_bytes / delta + 1]{};
DEFER(delete []r);
int idx = 0;
for (off_t offset = 0; offset < (off_t)index_bytes; offset += delta) {
size_t chunk_size = std::min(index_bytes - offset, delta);
auto th = photon::thread_create11(&ZFile::read_chunk, file, (char*)ibuf.get() + offset, pht->index_offset + offset, chunk_size, &r[idx++]);
ths.push_back(photon::thread_enable_join(th));
}
ret = 0;
for (int i =0; i<idx; i++){
photon::thread_join(ths[i]);
if (r[i]!=0) {
ret = -1;
}
}
if (ret != 0) {
LOG_ERRNO_RETURN(0, false, "failed to read index");
}
if (pht->is_digest_enabled()) {
Expand Down
7 changes: 7 additions & 0 deletions src/tools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ add_executable(overlaybd-commit overlaybd-commit.cpp)
target_include_directories(overlaybd-commit PUBLIC ${PHOTON_INCLUDE_DIR})
target_link_libraries(overlaybd-commit photon_static overlaybd_image_lib)

add_executable(overlaybd-merge overlaybd-merge.cpp)
target_include_directories(overlaybd-merge PUBLIC ${PHOTON_INCLUDE_DIR})
target_link_libraries(overlaybd-merge photon_static overlaybd_image_lib)


add_executable(overlaybd-create overlaybd-create.cpp)
target_include_directories(overlaybd-create PUBLIC ${PHOTON_INCLUDE_DIR})
target_link_libraries(overlaybd-create photon_static overlaybd_lib)
Expand Down Expand Up @@ -30,6 +35,8 @@ install(TARGETS
overlaybd-create
overlaybd-zfile
overlaybd-apply
overlaybd-merge

turboOCI-apply
DESTINATION /opt/overlaybd/bin
)
Loading

0 comments on commit e13a27d

Please sign in to comment.