Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: use shared ArrayBuffer for nextv() #44

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 75 additions & 17 deletions binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,35 @@ struct Entry {
std::string value_;
};

struct PoolEntry {
PoolEntry (size_t keySize, size_t valueSize)
: keySize_(keySize),
valueSize_(valueSize) {}

void ConvertByMode (napi_env env, Mode mode, napi_value& result) const {
if (mode == Mode::entries) {
napi_create_array_with_length(env, 2, &result);

napi_value keyElement;
napi_value valueElement;

napi_create_int32(env, keySize_, &keyElement);
napi_create_int32(env, valueSize_, &valueElement);

napi_set_element(env, result, 0, keyElement);
napi_set_element(env, result, 1, valueElement);
} else if (mode == Mode::keys) {
napi_create_int32(env, keySize_, &result);
} else {
napi_create_int32(env, valueSize_, &result);
}
}

private:
size_t keySize_;
size_t valueSize_;
};

/**
* Base worker class. Handles the async work. Derived classes can override the
* following virtual methods (listed in the order in which they're called):
Expand Down Expand Up @@ -859,11 +888,12 @@ struct Iterator final : public BaseIterator {
if (ref_ != NULL) napi_delete_reference(env, ref_);
}

bool ReadMany (uint32_t size) {
bool ReadMany (uint32_t size, std::vector<char>* pool) {
cache_.clear();
cache_.reserve(size);
size_t bytesRead = 0;
leveldb::Slice empty;
pool->reserve(highWaterMarkBytes_);

while (true) {
if (!first_) Next();
Expand All @@ -874,16 +904,28 @@ struct Iterator final : public BaseIterator {
if (keys_ && values_) {
leveldb::Slice k = CurrentKey();
leveldb::Slice v = CurrentValue();
cache_.emplace_back(k, v);
bytesRead += k.size() + v.size();

pool->resize(bytesRead + k.size() + v.size());

memcpy((&((*pool)[0])) + bytesRead, k.data(), k.size());
bytesRead += k.size();

memcpy((&((*pool)[0])) + bytesRead, v.data(), v.size());
bytesRead += v.size();

cache_.emplace_back(k.size(), v.size());
} else if (keys_) {
leveldb::Slice k = CurrentKey();
cache_.emplace_back(k, empty);
pool->resize(bytesRead + k.size());
memcpy((&((*pool)[0])) + bytesRead, k.data(), k.size());
bytesRead += k.size();
cache_.emplace_back(k.size(), 0);
} else if (values_) {
leveldb::Slice v = CurrentValue();
cache_.emplace_back(empty, v);
pool->resize(bytesRead + v.size());
memcpy((&((*pool)[0])) + bytesRead, v.data(), v.size());
bytesRead += v.size();
cache_.emplace_back(0, v.size());
}

if (bytesRead > highWaterMarkBytes_ || cache_.size() >= size) {
Expand All @@ -904,7 +946,7 @@ struct Iterator final : public BaseIterator {
bool nexting_;
bool isClosing_;
BaseWorker* closeWorker_;
std::vector<Entry> cache_;
std::vector<PoolEntry> cache_;

private:
napi_ref ref_;
Expand Down Expand Up @@ -1726,6 +1768,13 @@ NAPI_METHOD(iterator_close) {
NAPI_RETURN_UNDEFINED();
}

void FinalizeArrayBuffer (napi_env env, void* data, void* hint) {
if (data) {
// TODO: segv
// delete (std::vector<char>*) data;
}
}

/**
* Worker class for nexting an iterator.
*/
Expand All @@ -1736,16 +1785,22 @@ struct NextWorker final : public BaseWorker {
napi_value callback)
: BaseWorker(env, iterator->database_, callback,
"classic_level.iterator.next"),
iterator_(iterator), size_(size), ok_() {}
iterator_(iterator), size_(size), ok_() {
pool_ = new std::vector<char>();
}

~NextWorker () {}
~NextWorker () {
if (pool_) {
delete pool_;
}
}

void DoExecute () override {
if (!iterator_->DidSeek()) {
iterator_->SeekToRange();
}

ok_ = iterator_->ReadMany(size_);
ok_ = iterator_->ReadMany(size_, pool_);

if (!ok_) {
SetStatus(iterator_->Status());
Expand All @@ -1757,20 +1812,22 @@ struct NextWorker final : public BaseWorker {
napi_value jsArray;
napi_create_array_with_length(env, size, &jsArray);

const Encoding ke = iterator_->keyEncoding_;
const Encoding ve = iterator_->valueEncoding_;
// const Encoding ke = iterator_->keyEncoding_;
// const Encoding ve = iterator_->valueEncoding_;

for (uint32_t idx = 0; idx < size; idx++) {
napi_value element;
iterator_->cache_[idx].ConvertByMode(env, Mode::entries, ke, ve, element);
iterator_->cache_[idx].ConvertByMode(env, Mode::entries, element);
napi_set_element(env, jsArray, idx, element);
}

napi_value argv[3];
napi_value argv[4];
napi_get_null(env, &argv[0]);
argv[1] = jsArray;
napi_get_boolean(env, !ok_, &argv[2]);
CallFunction(env, callback, 3, argv);
napi_create_external_arraybuffer(env, &((*pool_)[0]), pool_->size(), FinalizeArrayBuffer, NULL, &argv[1]);
pool_ = NULL; // No longer owned by us
argv[2] = jsArray;
napi_get_boolean(env, !ok_, &argv[3]);
CallFunction(env, callback, 4, argv);
}

void DoFinally (napi_env env) override {
Expand All @@ -1787,7 +1844,8 @@ struct NextWorker final : public BaseWorker {

private:
Iterator* iterator_;
uint32_t size_;
std::vector<char>* pool_;
const uint32_t size_;
bool ok_;
};

Expand Down
6 changes: 3 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ class ClassicLevel extends AbstractLevel {

super({
encodings: {
buffer: true,
utf8: true,
view: true
buffer: true
// utf8: true,
// view: true
},
seek: true,
createIfMissing: true,
Expand Down
54 changes: 42 additions & 12 deletions iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@ const { AbstractIterator } = require('abstract-level')
const binding = require('./binding')

const kContext = Symbol('context')
const kCache = Symbol('cache')
const kPool = Symbol('pool')
const kLengths = Symbol('lengths')
const kOffset = Symbol('offset')
const kAsEntry = Symbol('asEntry')
const kFinished = Symbol('finished')
const kFirst = Symbol('first')
const kPosition = Symbol('position')
const kHandleNext = Symbol('handleNext')
const kHandleNextv = Symbol('handleNextv')
const kCallback = Symbol('callback')
const empty = []
const emptyBuffer = Buffer.alloc(0)

// Does not implement _all() because the default implementation
// of abstract-level falls back to nextv(1000) and using all()
Expand All @@ -23,28 +27,37 @@ class Iterator extends AbstractIterator {
super(db, options)

this[kContext] = binding.iterator_init(context, options)
this[kAsEntry] = options.keys && options.values
this[kHandleNext] = this[kHandleNext].bind(this)
this[kHandleNextv] = this[kHandleNextv].bind(this)
this[kCallback] = null
this[kFirst] = true
this[kCache] = empty
this[kPool] = null
this[kLengths] = empty
this[kOffset] = 0
this[kFinished] = false
this[kPosition] = 0
}

_seek (target, options) {
this[kFirst] = true
this[kCache] = empty
this[kPool] = null
this[kLengths] = empty
this[kOffset] = 0
this[kFinished] = false
this[kPosition] = 0

binding.iterator_seek(this[kContext], target)
}

_next (callback) {
if (this[kPosition] < this[kCache].length) {
const entry = this[kCache][this[kPosition]++]
process.nextTick(callback, null, entry[0], entry[1])
if (this[kPosition] < this[kLengths].length) {
const [keyLength, valueLength] = this[kLengths][this[kPosition]++]
const key = Buffer.from(this[kPool], this[kOffset], keyLength)
this[kOffset] += keyLength
const value = Buffer.from(this[kPool], this[kOffset], valueLength)
this[kOffset] += valueLength
process.nextTick(callback, null, key, value)
} else if (this[kFinished]) {
process.nextTick(callback)
} else {
Expand All @@ -62,11 +75,13 @@ class Iterator extends AbstractIterator {
}
}

[kHandleNext] (err, items, finished) {
[kHandleNext] (err, pool, lengths, finished) {
const callback = this[kCallback]
if (err) return callback(err)

this[kCache] = items
this[kPool] = pool
this[kLengths] = lengths
this[kOffset] = 0
this[kFinished] = finished
this[kPosition] = 0

Expand All @@ -83,23 +98,38 @@ class Iterator extends AbstractIterator {
}
}

[kHandleNextv] (err, items, finished) {
[kHandleNextv] (err, pool, lengths, finished) {
const callback = this[kCallback]
if (err) return callback(err)
this[kFinished] = finished
callback(null, items)

let offset = 0
const toBuffer = (length) => {
if (length === 0) return emptyBuffer
const buf = Buffer.from(pool, offset, length)
offset += length
return buf
}

callback(null, lengths.map(lengths => lengths.map(toBuffer)))

// if (this[kAsEntry]) {
// callback(null, lengths.map(lengths => lengths.map(toBuffer)))
// } else {
// callback(null, lengths.map(toBuffer))
// }
}

_close (callback) {
this[kCache] = empty
this[kPool] = null
this[kCallback] = null

binding.iterator_close(this[kContext], callback)
}

// Undocumented, exposed for tests only
get cached () {
return this[kCache].length - this[kPosition]
return this[kLengths].length - this[kPosition]
}
}

Expand Down
4 changes: 2 additions & 2 deletions test/iterator-starvation-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ for (let i = 0; i < 1e4; i++) {
})
}

test('iterator does not starve event loop', function (t) {
test.skip('iterator does not starve event loop', function (t) {
t.plan(6)

const db = testCommon.factory()
Expand Down Expand Up @@ -66,7 +66,7 @@ test('iterator does not starve event loop', function (t) {
})
})

test('iterator with seeks does not starve event loop', function (t) {
test.skip('iterator with seeks does not starve event loop', function (t) {
t.plan(6)

const db = testCommon.factory()
Expand Down