Skip to content

Commit

Permalink
feat(c-api): Allow to track PutCF/DeleteCf operations on WriteBatch::…
Browse files Browse the repository at this point in the history
…Handler
  • Loading branch information
ovr committed Aug 4, 2023
1 parent 3f7c92b commit add8945
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 0 deletions.
63 changes: 63 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2123,16 +2123,75 @@ void rocksdb_writebatch_put_log_data(rocksdb_writebatch_t* b, const char* blob,
class H : public WriteBatch::Handler {
public:
void* state_;

void (*put_)(void*, const char* k, size_t klen, const char* v, size_t vlen);
void (*put_cf_)(void*, uint32_t column_family_id, const char* k, size_t klen, const char* v, size_t vlen);
void (*deleted_)(void*, const char* k, size_t klen);
void (*deleted_cf_)(void*, uint32_t column_family_id, const char* k, size_t klen);

void Put(const Slice& key, const Slice& value) override {
(*put_)(state_, key.data(), key.size(), value.data(), value.size());
}
void Delete(const Slice& key) override {
(*deleted_)(state_, key.data(), key.size());
}
Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
if (deleted_cf_) {
(*deleted_cf_)(state_, column_family_id, key.data(), key.size());
return Status::OK();
}

return Status::OK();
}
Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
if (put_cf_) {
(*put_cf_)(state_, column_family_id, key.data(), key.size(), value.data(), value.size());
return Status::OK();
}

return Status::OK();
}
};

struct rocksdb_writebatch_handler_t {
H rep;
};

rocksdb_writebatch_handler_t* rocksdb_writebatch_new_handler(
void* state,
void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen),
void (*deleted)(void*, const char* k, size_t klen)
) {
H handler;
handler.state_ = state;
handler.put_ = put;
handler.deleted_ = deleted;

rocksdb_writebatch_handler_t* h = new rocksdb_writebatch_handler_t;
h->rep = handler;

return h;
}

void rocksdb_writebatch_handler_set_put_cf(
rocksdb_writebatch_handler_t* h,
void (*put_cf)(void*, uint32_t column_family_id, const char* k, size_t klen, const char* v, size_t vlen)
) {
h->rep.put_cf_ = put_cf;
}

void rocksdb_writebatch_handler_set_delete_cf(
rocksdb_writebatch_handler_t* h,
void (*delete_cf)(void*, uint32_t column_family_id, const char* k, size_t klen)
) {
h->rep.deleted_cf_ = delete_cf;
}

void rocksdb_writebatch_handler_destroy(rocksdb_writebatch_handler_t* h) {
delete h;
}

void rocksdb_writebatch_iterate(rocksdb_writebatch_t* b, void* state,
void (*put)(void*, const char* k, size_t klen,
const char* v, size_t vlen),
Expand All @@ -2145,6 +2204,10 @@ void rocksdb_writebatch_iterate(rocksdb_writebatch_t* b, void* state,
b->rep.Iterate(&handler);
}

void rocksdb_writebatch_iterate_with_handler(rocksdb_writebatch_t* b, rocksdb_writebatch_handler_t* h) {
b->rep.Iterate(&h->rep);
}

const char* rocksdb_writebatch_data(rocksdb_writebatch_t* b, size_t* size) {
*size = b->rep.GetDataSize();
return b->rep.Data().c_str();
Expand Down
54 changes: 54 additions & 0 deletions db/c_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,36 @@ static void CheckDel(void* ptr, const char* k, size_t klen) {
(*state)++;
}

// Callback from rocksdb_writebatch_iterate()
static void CheckPutCf(void* ptr, uint32_t column_family_id, const char* k, size_t klen, const char* v,
size_t vlen) {
CheckCondition(column_family_id == 2);

int* state = (int*)ptr;
CheckCondition(*state < 4);
switch (*state) {
case 0:
CheckEqual("bar", k, klen);
CheckEqual("b", v, vlen);
break;
case 1:
CheckEqual("box", k, klen);
CheckEqual("c", v, vlen);
break;
}
(*state)++;
}

// Callback from rocksdb_writebatch_iterate()
static void CheckDelCf(void* ptr, uint32_t column_family_id, const char* k, size_t klen) {
CheckCondition(column_family_id == 2);

int* state = (int*)ptr;
CheckCondition(*state == 2);
CheckEqual("bar", k, klen);
(*state)++;
}

static void CmpDestroy(void* arg) { (void)arg; }

static int CmpCompare(void* arg, const char* a, size_t alen, const char* b,
Expand Down Expand Up @@ -979,6 +1009,30 @@ int main(int argc, char** argv) {
rocksdb_writebatch_destroy(wb);
}

StartPhase("writebatch_iterator_handler");
{
rocksdb_writebatch_t* wb = rocksdb_writebatch_create();
rocksdb_writebatch_put(wb, "foo", 3, "a", 1);
rocksdb_writebatch_clear(wb);
rocksdb_writebatch_put(wb, "bar", 3, "b", 1);
rocksdb_writebatch_put(wb, "box", 3, "c", 1);
rocksdb_writebatch_delete(wb, "bar", 3);

rocksdb_write(db, woptions, wb, &err);
CheckNoError(err);

int pos = 0;
rocksdb_writebatch_handler_t* h = rocksdb_writebatch_new_handler(&pos, CheckPut, CheckDel);
rocksdb_writebatch_handler_set_put_cf(h, CheckPutCf);
rocksdb_writebatch_handler_set_delete_cf(h, CheckDelCf);

rocksdb_writebatch_iterate_with_handler(wb, h);
CheckCondition(pos == 3);

rocksdb_writebatch_handler_destroy(h);
rocksdb_writebatch_destroy(wb);
}

StartPhase("writebatch_vectors");
{
rocksdb_writebatch_t* wb = rocksdb_writebatch_create();
Expand Down
16 changes: 16 additions & 0 deletions include/rocksdb/c.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ typedef struct rocksdb_slicetransform_t rocksdb_slicetransform_t;
typedef struct rocksdb_snapshot_t rocksdb_snapshot_t;
typedef struct rocksdb_writablefile_t rocksdb_writablefile_t;
typedef struct rocksdb_writebatch_t rocksdb_writebatch_t;
typedef struct rocksdb_writebatch_handler_t rocksdb_writebatch_handler_t;
typedef struct rocksdb_writebatch_wi_t rocksdb_writebatch_wi_t;
typedef struct rocksdb_writeoptions_t rocksdb_writeoptions_t;
typedef struct rocksdb_universal_compaction_options_t
Expand Down Expand Up @@ -816,6 +817,21 @@ extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_iterate(
rocksdb_writebatch_t*, void* state,
void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen),
void (*deleted)(void*, const char* k, size_t klen));
extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_iterate_with_handler(rocksdb_writebatch_t* b, rocksdb_writebatch_handler_t* h);
extern ROCKSDB_LIBRARY_API rocksdb_writebatch_handler_t* rocksdb_writebatch_new_handler(
void* state,
void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen),
void (*deleted)(void*, const char* k, size_t klen)
);
extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_handler_set_put_cf(
rocksdb_writebatch_handler_t* h,
void (*put_cf)(void*, uint32_t column_family_id, const char* k, size_t klen, const char* v, size_t vlen)
);
extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_handler_set_delete_cf(
rocksdb_writebatch_handler_t* h,
void (*delete_cf)(void*, uint32_t column_family_id, const char* k, size_t klen)
);
extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_handler_destroy(rocksdb_writebatch_handler_t* h);
extern ROCKSDB_LIBRARY_API const char* rocksdb_writebatch_data(
rocksdb_writebatch_t*, size_t* size);
extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_set_save_point(
Expand Down

0 comments on commit add8945

Please sign in to comment.