Skip to content

Commit

Permalink
feat: deterministic DAG structure (#36)
Browse files Browse the repository at this point in the history
This PR changes the sharding strategy to ensure a pail DAG is deterministic in structure (provided operations do not touch the same keys) i.e. operations can be applied in any order, and they will result in the same DAG.

This makes diffing easier, and can enable pail to operate efficiently in a diff syncing environment without a merkle clock.

It also results in a significant speedup to `put` operations, due to the reduced string comparison and manipulation operations needed.

```console
$ node bench/put-x10_000.js   
setup
bench
..........
put x10000: 51.736s

$ node bench/put-x10_000-v1.js
setup
bench
..........
put x10000: 1.136s
```

While the strategy (and configuration) for shards has changed, the main format has not. This means that v1 can _read_ v0 but should not write to it.

The change in the sharding strategy is that instead of sharding at the back of a key (largest common prefix) when the size limit is reached, we shard at the front, using the smallest common prefix (which is always a single character) always, if a common prefix exists. This effectively ensures consistent sharding.

For example, previously when putting `aaaa` and then `aabb` we might end up with a DAG like this when the shard size is reached:

```
aa -> aa
      bb
```

If we then put `abbb` we might then get a DAG like:

```
aa -> aa
      bb
abbb
```

...but if `abbb` was put BEFORE `aabb` we may have ended up with a DAG like:

```
a -> aaa
     abb
     bbb
```

Now we _always_ end up with a DAG like the following, because we _always_ shard when there's a common prefix, independent of the order and indepentent of shard size:

```
a -> a -> aa
          bb
     bbb
```

That is to say, there is no maximum shard size, and the max key length is now absolute, not just the point at which a shard is created.

The maximum shard size is controlled by the creator of the pail by specifying 2 options:

* `keyChars` - the characters allowed in keys (default ASCII only).
* `maxKeySize` - maximum size in bytes a UTF-8 encoded key is allowed to be (default 4096 bytes).

A good estimate for the max size of a shard is the number of characters allowed in keys multiplied by the maximum key size. For ASCII keys of 4096 bytes the biggest possible shard will stay well within the maximum block size allowed by libp2p.

Note: The default max key size is the same as `MAX_PATH` - the maximum filename+path size on most windows/unix systems so should be sifficient for most purposes.

Note: even if you use unicode key characters it would still be tricky to exceed the max libp2p block size, but it is not impossible.

Overall this makes pail skewed less towards quick reads and more towards quick writes. This is contrary to the original goal of the project but on balance I think worth the trade offs, in view of the DAG structure properties and the write speed increase.

I discovered a bug with `removals` in the current implementation - In theory there's a chance if you put the same value to multiple keys and then update it, a shard might appear in the `removals` list that is a shard in a different part of the tree. The new version fixes this by encoding the path prefix into each shard.

The following trade offs exist:

* Total DAG size is around 20% bigger.
* Theres around around 8x more blocks in the DAG.
* Average DAG depth (the number of blocks you need to traverse to extract a value) is now ~4 vs ~2 previously. This will have a direct impact in read speed, especially network bound reads. Mitigated somewhat by the smaller block size.

However:

* Put operations are now ~50x faster.
* Average block size is 85% smaller.
* The biggest shard in the DAG is now 99% smaller.
* DAG structure is deterministic, which should speed up diffing
  • Loading branch information
Alan Shaw authored Apr 22, 2024
1 parent 3f90935 commit a26acea
Show file tree
Hide file tree
Showing 23 changed files with 952 additions and 713 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
node_modules
coverage
pail.car
*.car
dist
.clinic
40 changes: 26 additions & 14 deletions bench/put-x10_000-batcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as Batch from '../src/batch/index.js'
import { ShardBlock } from '../src/shard.js'
import { MemoryBlockstore } from '../src/block.js'
import { randomCID, randomString, randomInteger } from '../test/helpers.js'
import { collectMetrics, writePail } from './util.js'

const NUM = 10_000

Expand All @@ -23,24 +24,35 @@ async function main () {
kvs.push([k, v])
}

/** @type {API.ShardLink} */
let root = rootBlock.cid
console.log('bench')
console.time(`put x${NUM}`)
const batch = await Batch.create(blocks, rootBlock.cid)
for (let i = 0; i < kvs.length; i++) {
await batch.put(kvs[i][0], kvs[i][1])
if (i % 1000 === 0) {
process.stdout.write('.')
try {
const batch = await Batch.create(blocks, rootBlock.cid)
for (let i = 0; i < kvs.length; i++) {
await batch.put(kvs[i][0], kvs[i][1])
if (i % 1000 === 0) {
process.stdout.write('.')
}
}
const result = await batch.commit()
for (const b of result.additions) {
blocks.putSync(b.cid, b.bytes)
}
for (const b of result.removals) {
blocks.deleteSync(b.cid)
}
root = result.root
} catch (err) {
console.log('')
console.error(err)
} finally {
console.log('')
console.timeEnd(`put x${NUM}`)
await writePail(blocks, root)
console.log(await collectMetrics(blocks, root))
}
const result = await batch.commit()
for (const b of result.additions) {
blocks.putSync(b.cid, b.bytes)
}
for (const b of result.removals) {
blocks.deleteSync(b.cid)
}
console.log('')
console.timeEnd(`put x${NUM}`)
}

main()
41 changes: 25 additions & 16 deletions bench/put-x10_000.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { put } from '../src/index.js'
import { ShardBlock } from '../src/shard.js'
import { MemoryBlockstore } from '../src/block.js'
import { randomCID, randomString, randomInteger } from '../test/helpers.js'
import { collectMetrics, writePail } from './util.js'

const NUM = 10_000

Expand All @@ -16,32 +17,40 @@ async function main () {

/** @type {Array<[string, API.UnknownLink]>} */
const kvs = []

for (let i = 0; i < NUM; i++) {
const k = randomString(randomInteger(1, 64))
const v = await randomCID(randomInteger(8, 128))
kvs.push([k, v])
}

console.log('bench')
console.time(`put x${NUM}`)
/** @type {API.ShardLink} */
let root = rootBlock.cid
for (let i = 0; i < kvs.length; i++) {
const result = await put(blocks, root, kvs[i][0], kvs[i][1])
for (const b of result.additions) {
blocks.putSync(b.cid, b.bytes)
}
for (const b of result.removals) {
blocks.deleteSync(b.cid)
}
root = result.root
if (i % 1000 === 0) {
process.stdout.write('.')
console.log('bench')
console.time(`put x${NUM}`)

try {
for (let i = 0; i < kvs.length; i++) {
const result = await put(blocks, root, kvs[i][0], kvs[i][1])
for (const b of result.additions) {
blocks.putSync(b.cid, b.bytes)
}
for (const b of result.removals) {
blocks.deleteSync(b.cid)
}
root = result.root
if (i % 1000 === 0) {
process.stdout.write('.')
}
}
} catch (err) {
console.log('')
console.error(err)
} finally {
console.log('')
console.timeEnd(`put x${NUM}`)
await writePail(blocks, root)
console.log(await collectMetrics(blocks, root))
}
console.log('')
console.timeEnd(`put x${NUM}`)
}

main()
77 changes: 77 additions & 0 deletions bench/util.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import fs from 'fs'
import { Readable } from 'stream'
import { CarWriter } from '@ipld/car'
// eslint-disable-next-line no-unused-vars
import * as API from '../src/api.js'
import { get, entries } from '../src/index.js'
import { MemoryBlockstore } from '../src/block.js'
import { ShardFetcher } from '../src/shard.js'

/**
* @param {MemoryBlockstore} blocks
* @param {API.ShardLink} root
*/
export const writePail = async (blocks, root) => {
// @ts-expect-error
const { writer, out } = CarWriter.create(root)
const finishPromise = new Promise(resolve => {
Readable.from(out).pipe(fs.createWriteStream('./bench.car')).on('finish', resolve)
})

for (const b of blocks.entries()) {
// @ts-expect-error
await writer.put(b)
}
await writer.close()
await finishPromise
}

/**
* @param {MemoryBlockstore} blocks
* @param {API.ShardLink} root
*/
export const collectMetrics = async (blocks, root) => {
const shards = new ShardFetcher(blocks)
const rshard = await shards.get(root)

let maxDepth = 0
let totalDepth = 0
let totalEntries = 0

let totalShards = 1
let maxShardSize = rshard.bytes.length
let totalSize = rshard.bytes.length

/**
* @param {API.ShardEntry} entry
* @param {number} depth
*/
const collectData = async ([, v], depth) => {
if (!Array.isArray(v)) {
totalEntries++
maxDepth = depth > maxDepth ? depth : maxDepth
totalDepth += depth
return
}
if (v[1]) totalEntries++
const blk = await shards.get(v[0])
totalShards++
maxShardSize = blk.bytes.length > maxShardSize ? blk.bytes.length : maxShardSize
totalSize += blk.bytes.length
return Promise.all(blk.value.entries.map(e => collectData(e, depth + 1)))
}

for (const entry of rshard.value.entries) {
await collectData(entry, 1)
}

return {
maxDepth,
avgDepth: Math.round(totalDepth / totalEntries),
totalEntries,
totalShards,
maxShardSize,
avgShardSize: Math.round(totalSize / totalShards),
totalSize
}
}
Loading

0 comments on commit a26acea

Please sign in to comment.