diff --git a/.gitignore b/.gitignore index b0818cfa..793ad9f1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ node_modules coverage -pail.car +*.car dist .clinic diff --git a/bench/put-x10_000-batcher.js b/bench/put-x10_000-batcher.js index 1da25f1b..c8e29fef 100644 --- a/bench/put-x10_000-batcher.js +++ b/bench/put-x10_000-batcher.js @@ -4,6 +4,7 @@ import * as Batch from '../src/batch/index.js' import { ShardBlock } from '../src/shard.js' import { MemoryBlockstore } from '../src/block.js' import { randomCID, randomString, randomInteger } from '../test/helpers.js' +import { collectMetrics, writePail } from './util.js' const NUM = 10_000 @@ -23,24 +24,35 @@ async function main () { kvs.push([k, v]) } + /** @type {API.ShardLink} */ + let root = rootBlock.cid console.log('bench') console.time(`put x${NUM}`) - const batch = await Batch.create(blocks, rootBlock.cid) - for (let i = 0; i < kvs.length; i++) { - await batch.put(kvs[i][0], kvs[i][1]) - if (i % 1000 === 0) { - process.stdout.write('.') + try { + const batch = await Batch.create(blocks, rootBlock.cid) + for (let i = 0; i < kvs.length; i++) { + await batch.put(kvs[i][0], kvs[i][1]) + if (i % 1000 === 0) { + process.stdout.write('.') + } } + const result = await batch.commit() + for (const b of result.additions) { + blocks.putSync(b.cid, b.bytes) + } + for (const b of result.removals) { + blocks.deleteSync(b.cid) + } + root = result.root + } catch (err) { + console.log('') + console.error(err) + } finally { + console.log('') + console.timeEnd(`put x${NUM}`) + await writePail(blocks, root) + console.log(await collectMetrics(blocks, root)) } - const result = await batch.commit() - for (const b of result.additions) { - blocks.putSync(b.cid, b.bytes) - } - for (const b of result.removals) { - blocks.deleteSync(b.cid) - } - console.log('') - console.timeEnd(`put x${NUM}`) } main() diff --git a/bench/put-x10_000.js b/bench/put-x10_000.js index a50b07cd..a5e46ab8 100644 --- a/bench/put-x10_000.js +++ b/bench/put-x10_000.js @@ -4,6 +4,7 @@ import { put } from '../src/index.js' import { ShardBlock } from '../src/shard.js' import { MemoryBlockstore } from '../src/block.js' import { randomCID, randomString, randomInteger } from '../test/helpers.js' +import { collectMetrics, writePail } from './util.js' const NUM = 10_000 @@ -16,32 +17,40 @@ async function main () { /** @type {Array<[string, API.UnknownLink]>} */ const kvs = [] - for (let i = 0; i < NUM; i++) { const k = randomString(randomInteger(1, 64)) const v = await randomCID(randomInteger(8, 128)) kvs.push([k, v]) } - console.log('bench') - console.time(`put x${NUM}`) /** @type {API.ShardLink} */ let root = rootBlock.cid - for (let i = 0; i < kvs.length; i++) { - const result = await put(blocks, root, kvs[i][0], kvs[i][1]) - for (const b of result.additions) { - blocks.putSync(b.cid, b.bytes) - } - for (const b of result.removals) { - blocks.deleteSync(b.cid) - } - root = result.root - if (i % 1000 === 0) { - process.stdout.write('.') + console.log('bench') + console.time(`put x${NUM}`) + + try { + for (let i = 0; i < kvs.length; i++) { + const result = await put(blocks, root, kvs[i][0], kvs[i][1]) + for (const b of result.additions) { + blocks.putSync(b.cid, b.bytes) + } + for (const b of result.removals) { + blocks.deleteSync(b.cid) + } + root = result.root + if (i % 1000 === 0) { + process.stdout.write('.') + } } + } catch (err) { + console.log('') + console.error(err) + } finally { + console.log('') + console.timeEnd(`put x${NUM}`) + await writePail(blocks, root) + console.log(await collectMetrics(blocks, root)) } - console.log('') - console.timeEnd(`put x${NUM}`) } main() diff --git a/bench/util.js b/bench/util.js new file mode 100644 index 00000000..677d3b81 --- /dev/null +++ b/bench/util.js @@ -0,0 +1,77 @@ +import fs from 'fs' +import { Readable } from 'stream' +import { CarWriter } from '@ipld/car' +// eslint-disable-next-line no-unused-vars +import * as API from '../src/api.js' +import { get, entries } from '../src/index.js' +import { MemoryBlockstore } from '../src/block.js' +import { ShardFetcher } from '../src/shard.js' + +/** + * @param {MemoryBlockstore} blocks + * @param {API.ShardLink} root + */ +export const writePail = async (blocks, root) => { + // @ts-expect-error + const { writer, out } = CarWriter.create(root) + const finishPromise = new Promise(resolve => { + Readable.from(out).pipe(fs.createWriteStream('./bench.car')).on('finish', resolve) + }) + + for (const b of blocks.entries()) { + // @ts-expect-error + await writer.put(b) + } + await writer.close() + await finishPromise +} + +/** + * @param {MemoryBlockstore} blocks + * @param {API.ShardLink} root + */ +export const collectMetrics = async (blocks, root) => { + const shards = new ShardFetcher(blocks) + const rshard = await shards.get(root) + + let maxDepth = 0 + let totalDepth = 0 + let totalEntries = 0 + + let totalShards = 1 + let maxShardSize = rshard.bytes.length + let totalSize = rshard.bytes.length + + /** + * @param {API.ShardEntry} entry + * @param {number} depth + */ + const collectData = async ([, v], depth) => { + if (!Array.isArray(v)) { + totalEntries++ + maxDepth = depth > maxDepth ? depth : maxDepth + totalDepth += depth + return + } + if (v[1]) totalEntries++ + const blk = await shards.get(v[0]) + totalShards++ + maxShardSize = blk.bytes.length > maxShardSize ? blk.bytes.length : maxShardSize + totalSize += blk.bytes.length + return Promise.all(blk.value.entries.map(e => collectData(e, depth + 1))) + } + + for (const entry of rshard.value.entries) { + await collectData(entry, 1) + } + + return { + maxDepth, + avgDepth: Math.round(totalDepth / totalEntries), + totalEntries, + totalShards, + maxShardSize, + avgShardSize: Math.round(totalSize / totalShards), + totalSize + } +} diff --git a/cli.js b/cli.js index 1f092e0c..c2b31f2c 100755 --- a/cli.js +++ b/cli.js @@ -2,18 +2,19 @@ import fs from 'fs' import os from 'os' import { join } from 'path' -import { Readable } from 'stream' +import { Readable, Writable } from 'stream' import sade from 'sade' import { CID } from 'multiformats/cid' -import { CarIndexedReader, CarReader, CarWriter } from '@ipld/car' +import { CARReaderStream, CARWriterStream } from 'carstream' import clc from 'cli-color' import archy from 'archy' // eslint-disable-next-line no-unused-vars import * as API from './src/api.js' import { put, get, del, entries } from './src/index.js' -import { ShardFetcher, ShardBlock, MaxShardSize } from './src/shard.js' +import { ShardFetcher, ShardBlock, isShardLink } from './src/shard.js' import { difference } from './src/diff.js' import { merge } from './src/merge.js' +import { MemoryBlockstore, MultiBlockFetcher } from './src/block.js' const cli = sade('pail') .option('--path', 'Path to data store.', './pail.car') @@ -21,77 +22,64 @@ const cli = sade('pail') cli.command('put ') .describe('Put a value (a CID) for the given key. If the key exists it\'s value is overwritten.') .alias('set') - .option('--max-shard-size', 'Maximum shard size in bytes.', MaxShardSize) .action(async (key, value, opts) => { - const maxShardSize = opts['max-shard-size'] ?? MaxShardSize - const blocks = await openPail(opts.path, { maxSize: maxShardSize }) - const roots = await blocks.getRoots() - // @ts-expect-error - const { root, additions, removals } = await put(blocks, roots[0], key, CID.parse(value)) + const { root: prevRoot, blocks } = await openPail(opts.path) + const { root, additions, removals } = await put(blocks, prevRoot, key, CID.parse(value)) await updatePail(opts.path, blocks, root, { additions, removals }) - console.log(clc.red(`--- ${roots[0]}`)) + console.log(clc.red(`--- ${prevRoot}`)) console.log(clc.green(`+++ ${root}`)) console.log(clc.magenta('@@ -1 +1 @@')) additions.forEach(b => console.log(clc.green(`+${b.cid}`))) removals.forEach(b => console.log(clc.red(`-${b.cid}`))) - await closePail(blocks) }) cli.command('get ') .describe('Get the stored value for the given key from the pail. If the key is not found, `undefined` is returned.') .action(async (key, opts) => { - const blocks = await openPail(opts.path) - // @ts-expect-error - const value = await get(blocks, (await blocks.getRoots())[0], key) + const { root, blocks } = await openPail(opts.path) + const value = await get(blocks, root, key) if (value) console.log(value.toString()) - await closePail(blocks) }) cli.command('del ') .describe('Delete the value for the given key from the pail. If the key is not found no operation occurs.') .alias('delete', 'rm', 'remove') .action(async (key, opts) => { - const blocks = await openPail(opts.path) - const roots = await blocks.getRoots() - // @ts-expect-error - const { root, additions, removals } = await del(blocks, roots[0], key) + const { root: prevRoot, blocks } = await openPail(opts.path) + const { root, additions, removals } = await del(blocks, prevRoot, key) await updatePail(opts.path, blocks, root, { additions, removals }) - console.log(clc.red(`--- ${roots[0]}`)) + console.log(clc.red(`--- ${prevRoot}`)) console.log(clc.green(`+++ ${root}`)) console.log(clc.magenta('@@ -1 +1 @@')) additions.forEach(b => console.log(clc.green(`+ ${b.cid}`))) removals.forEach(b => console.log(clc.red(`- ${b.cid}`))) - await closePail(blocks) }) cli.command('ls') .describe('List entries in the pail.') .alias('list') .option('-p, --prefix', 'Key prefix to filter by.') + .option('--gt', 'Filter results by keys greater than this string.') + .option('--lt', 'Filter results by keys less than this string.') .option('--json', 'Format output as newline delimted JSON.') .action(async (opts) => { - const blocks = await openPail(opts.path) - const root = (await blocks.getRoots())[0] + const { root, blocks } = await openPail(opts.path) let n = 0 - // @ts-expect-error - for await (const [k, v] of entries(blocks, root, { prefix: opts.prefix })) { + for await (const [k, v] of entries(blocks, root, { prefix: opts.prefix, gt: opts.gt, lt: opts.lt })) { console.log(opts.json ? JSON.stringify({ key: k, value: v.toString() }) : `${k}\t${v}`) n++ } if (!opts.json) console.log(`total ${n}`) - await closePail(blocks) }) cli.command('tree') .describe('Visualise the pail.') + .alias('vis') .action(async (opts) => { - const blocks = await openPail(opts.path) - const root = (await blocks.getRoots())[0] - // @ts-expect-error + const { root, blocks } = await openPail(opts.path) const shards = new ShardFetcher(blocks) - // @ts-expect-error const rshard = await shards.get(root) /** @type {archy.Data} */ @@ -118,27 +106,19 @@ cli.command('tree') } console.log(archy(archyRoot)) - await closePail(blocks) }) cli.command('diff ') .describe('Find the differences between this pail and the passed pail.') .option('-k, --keys', 'Output key/value diff.') .action(async (path, opts) => { - const [ablocks, bblocks] = await Promise.all([openPail(opts.path), openPail(path)]) - const [aroot, broot] = await Promise.all([ablocks, bblocks].map(async blocks => { - return /** @type {API.ShardLink} */((await blocks.getRoots())[0]) - })) + const [ + { root: aroot, blocks: ablocks }, + { root: broot, blocks: bblocks } + ] = await Promise.all([openPail(opts.path), openPail(path)]) if (aroot.toString() === broot.toString()) return - const fetcher = { - async get (cid) { - const blk = await ablocks.get(cid) - if (blk) return blk - return bblocks.get(cid) - } - } - // @ts-expect-error CarReader is not BlockFetcher + const fetcher = new MultiBlockFetcher(ablocks, bblocks) const { shards: { additions, removals }, keys } = await difference(fetcher, aroot, broot) console.log(clc.red(`--- ${aroot}`)) @@ -154,27 +134,18 @@ cli.command('diff ') additions.forEach(b => console.log(clc.green(`+ ${b.cid}`))) removals.forEach(b => console.log(clc.red(`- ${b.cid}`))) } - - await Promise.all([closePail(ablocks), closePail(bblocks)]) }) cli.command('merge ') .describe('Merge the passed pail into this pail.') .action(async (path, opts) => { - const [ablocks, bblocks] = await Promise.all([openPail(opts.path), openPail(path)]) - const [aroot, broot] = await Promise.all([ablocks, bblocks].map(async blocks => { - return /** @type {API.ShardLink} */((await blocks.getRoots())[0]) - })) + const [ + { root: aroot, blocks: ablocks }, + { root: broot, blocks: bblocks } + ] = await Promise.all([openPail(opts.path), openPail(path)]) if (aroot.toString() === broot.toString()) return - const fetcher = { - async get (cid) { - const blk = await ablocks.get(cid) - if (blk) return blk - return bblocks.get(cid) - } - } - // @ts-expect-error CarReader is not BlockFetcher + const fetcher = new MultiBlockFetcher(ablocks, bblocks) const { root, additions, removals } = await merge(fetcher, aroot, [broot]) await updatePail(opts.path, ablocks, root, { additions, removals }) @@ -184,65 +155,53 @@ cli.command('merge ') console.log(clc.magenta('@@ -1 +1 @@')) additions.forEach(b => console.log(clc.green(`+ ${b.cid}`))) removals.forEach(b => console.log(clc.red(`- ${b.cid}`))) - - await Promise.all([closePail(ablocks), closePail(bblocks)]) }) cli.parse(process.argv) /** * @param {string} path - * @param {{ maxSize?: number }} [config] - * @returns {Promise} + * @returns {Promise<{ root: API.ShardLink, blocks: MemoryBlockstore }>} */ -async function openPail (path, config) { +async function openPail (path) { + const blocks = new MemoryBlockstore() try { - return await CarIndexedReader.fromFile(path) + const carReader = new CARReaderStream() + const readable = /** @type {ReadableStream} */ (Readable.toWeb(fs.createReadStream(path))) + await readable.pipeThrough(carReader).pipeTo(new WritableStream({ write: b => blocks.put(b.cid, b.bytes) })) + const header = await carReader.getHeader() + if (!isShardLink(header.roots[0])) throw new Error(`not a shard: ${header.roots[0]}`) + return { root: header.roots[0], blocks } } catch (err) { if (err.code !== 'ENOENT') throw new Error('failed to open bucket', { cause: err }) - const rootblk = await ShardBlock.create(config) - const { writer, out } = CarWriter.create(rootblk.cid) - writer.put(rootblk) - writer.close() - return CarReader.fromIterable(out) - } -} - -/** @param {import('@ipld/car/api').CarReader} reader */ -async function closePail (reader) { - if (reader instanceof CarIndexedReader) { - await reader.close() + const rootblk = await ShardBlock.create() + blocks.put(rootblk.cid, rootblk.bytes) + return { root: rootblk.cid, blocks } } } /** * @param {string} path - * @param {import('@ipld/car/api').CarReader} reader + * @param {MemoryBlockstore} blocks * @param {API.ShardLink} root * @param {API.ShardDiff} diff */ -async function updatePail (path, reader, root, { additions, removals }) { - // @ts-expect-error - const { writer, out } = CarWriter.create(root) +async function updatePail (path, blocks, root, { additions, removals }) { const tmp = join(os.tmpdir(), `pail${Date.now()}.car`) - - const finishPromise = new Promise(resolve => { - Readable.from(out).pipe(fs.createWriteStream(tmp)).on('finish', resolve) - }) - - // put new blocks - for (const b of additions) { - await writer.put(b) - } - // put old blocks without removals - for await (const b of reader.blocks()) { - if (removals.some(r => b.cid.toString() === r.cid.toString())) { - continue + const iterator = blocks.entries() + const readable = new ReadableStream({ + start (controller) { + for (const b of additions) controller.enqueue(b) + }, + pull (controller) { + for (const b of iterator) { + if (removals.some(r => b.cid.toString() === r.cid.toString())) continue + return controller.enqueue(b) + } + controller.close() } - await writer.put(b) - } - await writer.close() - await finishPromise + }) + await readable.pipeThrough(new CARWriterStream([root])).pipeTo(Writable.toWeb(fs.createWriteStream(tmp))) const old = `${path}-${new Date().toISOString()}` try { diff --git a/package.json b/package.json index 75416024..cef8c1df 100644 --- a/package.json +++ b/package.json @@ -143,18 +143,17 @@ "dist" ], "dependencies": { - "@ipld/car": "^5.2.4", - "@ipld/dag-cbor": "^9.0.6", - "archy": "^1.0.0", - "cborg": "^4.0.7", - "cli-color": "^2.0.3", - "multiformats": "^12.1.3", - "sade": "^1.8.1" + "@ipld/dag-cbor": "^9.2.0", + "multiformats": "^13.1.0" }, "devDependencies": { + "archy": "^1.0.0", "c8": "^8.0.1", + "carstream": "^2.0.0", + "cli-color": "^2.0.3", "mocha": "^10.2.0", "nanoid": "^4.0.0", + "sade": "^1.8.1", "standard": "^17.0.0", "typescript": "^5.0.2" }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8d041cf8..0c5d960b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -5,38 +5,35 @@ settings: excludeLinksFromLockfile: false dependencies: - '@ipld/car': - specifier: ^5.2.4 - version: 5.2.6 '@ipld/dag-cbor': - specifier: ^9.0.6 - version: 9.0.8 - archy: - specifier: ^1.0.0 - version: 1.0.0 - cborg: - specifier: ^4.0.7 - version: 4.0.7 - cli-color: - specifier: ^2.0.3 - version: 2.0.3 + specifier: ^9.2.0 + version: 9.2.0 multiformats: - specifier: ^12.1.3 - version: 12.1.3 - sade: - specifier: ^1.8.1 - version: 1.8.1 + specifier: ^13.1.0 + version: 13.1.0 devDependencies: + archy: + specifier: ^1.0.0 + version: 1.0.0 c8: specifier: ^8.0.1 version: 8.0.1 + carstream: + specifier: ^2.0.0 + version: 2.0.0 + cli-color: + specifier: ^2.0.3 + version: 2.0.3 mocha: specifier: ^10.2.0 version: 10.2.0 nanoid: specifier: ^4.0.0 version: 4.0.2 + sade: + specifier: ^1.8.1 + version: 1.8.1 standard: specifier: ^17.0.0 version: 17.1.0 @@ -112,23 +109,12 @@ packages: resolution: {integrity: sha512-dvuCeX5fC9dXgJn9t+X5atfmgQAzUOWqS1254Gh0m6i8wKd10ebXkfNKiRK+1GWi/yTvvLDHpoxLr0xxxeslWw==} dev: true - /@ipld/car@5.2.6: - resolution: {integrity: sha512-ZiIYan7UFLLQsR90GpKOrZ0t6/6owrevJI7dCG8McNj0zUO4vGzsPumpKRBP4pdBgek4oXt4TbFOwxqTPEh5mA==} + /@ipld/dag-cbor@9.2.0: + resolution: {integrity: sha512-N14oMy0q4gM6OuZkIpisKe0JBSjf1Jb39VI+7jMLiWX9124u1Z3Fdj/Tag1NA0cVxxqWDh0CqsjcVfOKtelPDA==} engines: {node: '>=16.0.0', npm: '>=7.0.0'} dependencies: - '@ipld/dag-cbor': 9.0.8 cborg: 4.0.7 - multiformats: 13.0.1 - varint: 6.0.0 - dev: false - - /@ipld/dag-cbor@9.0.8: - resolution: {integrity: sha512-ETWJ7p7lmGw5X+BuI/7rf4/k56xyOvAOVNUVuQmnGYBdJjObLPgS+vyFxRk4odATlkyZqCq2MLNY52bhE6SlRA==} - engines: {node: '>=16.0.0', npm: '>=7.0.0'} - dependencies: - cborg: 4.0.7 - multiformats: 13.0.1 - dev: false + multiformats: 13.1.0 /@istanbuljs/schema@0.1.3: resolution: {integrity: sha512-ZXRY4jNvVgSVQ8DL3LTcakaAtXwTVUxE81hslsyD2AtoXW/wVob10HkOJ1X/pAlcI7D+2YoZKg5do8G/w6RYgA==} @@ -234,7 +220,7 @@ packages: /archy@1.0.0: resolution: {integrity: sha512-Xg+9RwCg/0p32teKdGMPTPnVXKD0w3DfHnFTficozsAgsvq2XenPJq/MYpzzQ/v8zrOyJn6Ds39VA4JIDwFfqw==} - dev: false + dev: true /argparse@2.0.1: resolution: {integrity: sha512-8+9WqebbFzpX9OR+Wa6O29asIogeRMzcGtAINdpMHHyAg10f05aSFVBbcEqGf/PXw1EjAZ+q2/bEBg3DvurK3Q==} @@ -399,10 +385,17 @@ packages: engines: {node: '>=10'} dev: true + /carstream@2.0.0: + resolution: {integrity: sha512-vmxqp4Quj4rVdkoXBzcQ3UzeH0JR7aXLr9rkx07pLp0gtPJyRxIU+9HZb94lKsaXxogM6EFRApEHusINPjsUuw==} + dependencies: + '@ipld/dag-cbor': 9.2.0 + multiformats: 13.1.0 + uint8arraylist: 2.4.8 + dev: true + /cborg@4.0.7: resolution: {integrity: sha512-5h2n7973T4dkY2XLfHpwYR9IjeDSfolZibYtb8clW53BvvgTyq+X2EtwrjfhTAETrwaQOX4+lRua14/XjbZHaQ==} hasBin: true - dev: false /chalk@4.1.2: resolution: {integrity: sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==} @@ -436,7 +429,7 @@ packages: es6-iterator: 2.0.3 memoizee: 0.4.15 timers-ext: 0.1.7 - dev: false + dev: true /cliui@7.0.4: resolution: {integrity: sha512-OcRE68cOsVMXp1Yvonl/fzkQOyjLSu/8bhPDfQt0e0/Eb283TKP20Fs2MqoPsr9SwA595rRCA+QMzYc9nBP+JQ==} @@ -488,7 +481,7 @@ packages: dependencies: es5-ext: 0.10.62 type: 1.2.0 - dev: false + dev: true /debug@3.2.7: resolution: {integrity: sha512-CFjzYYAi4ThfiQvizrFQevTTXHtnCqWfe7x1AhgEscTz6ZbLbfoLRLPugTQyBth6f8ZERVUSyWHFD/7Wu4t1XQ==} @@ -666,7 +659,7 @@ packages: es6-iterator: 2.0.3 es6-symbol: 3.1.3 next-tick: 1.1.0 - dev: false + dev: true /es6-iterator@2.0.3: resolution: {integrity: sha512-zw4SRzoUkd+cl+ZoE15A9o1oQd920Bb0iOJMQkQhl3jNc03YqVjAhG7scf9C5KWRU/R13Orf588uCC6525o02g==} @@ -674,14 +667,14 @@ packages: d: 1.0.1 es5-ext: 0.10.62 es6-symbol: 3.1.3 - dev: false + dev: true /es6-symbol@3.1.3: resolution: {integrity: sha512-NJ6Yn3FuDinBaBRWl/q5X/s4koRHBrgKAu+yGI6JCBeiu3qrcbJhwT2GeR/EXVfylRk8dpQVJoLEFhK+Mu31NA==} dependencies: d: 1.0.1 ext: 1.7.0 - dev: false + dev: true /es6-weak-map@2.0.3: resolution: {integrity: sha512-p5um32HOTO1kP+w7PRnB+5lQ43Z6muuMuIMffvDN8ZB4GcnjLBV6zGStpbASIMk4DCAvEaamhe2zhyCb/QXXsA==} @@ -690,7 +683,7 @@ packages: es5-ext: 0.10.62 es6-iterator: 2.0.3 es6-symbol: 3.1.3 - dev: false + dev: true /escalade@3.1.1: resolution: {integrity: sha512-k0er2gUkLf8O0zKJiAhmkTnJlTvINGv7ygDNPbeIsX/TJjGJZHuh9B2UxbsaEkmlEo9MfhrSzmhIlhRlI2GXnw==} @@ -986,13 +979,13 @@ packages: dependencies: d: 1.0.1 es5-ext: 0.10.62 - dev: false + dev: true /ext@1.7.0: resolution: {integrity: sha512-6hxeJYaL110a9b5TEJSj0gojyHQAmA2ch5Os+ySCiA1QGdS697XWY1pzsrSjqA9LDEEgdB/KypIlR59RcLuHYw==} dependencies: type: 2.7.2 - dev: false + dev: true /fast-deep-equal@3.1.3: resolution: {integrity: sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==} @@ -1401,7 +1394,7 @@ packages: /is-promise@2.2.2: resolution: {integrity: sha512-+lP4/6lKUBfQjZ2pdxThZvLUAafmZb8OAxFb8XXtiQmS35INgr85hdOGoEs124ez1FCnZJt6jau/T+alh58QFQ==} - dev: false + dev: true /is-regex@1.1.4: resolution: {integrity: sha512-kvRdxDsxZjhzUX07ZnLydzS1TU/TJlTUHHY4YLL87e37oUA49DfkLqgy+VjFocowy29cKvcSiu+kIv728jTTVg==} @@ -1618,7 +1611,7 @@ packages: resolution: {integrity: sha512-BpdYkt9EvGl8OfWHDQPISVpcl5xZthb+XPsbELj5AQXxIC8IriDZIQYjBJPEm5rS420sjZ0TLEzRcq5KdBhYrQ==} dependencies: es5-ext: 0.10.62 - dev: false + dev: true /make-dir@4.0.0: resolution: {integrity: sha512-hXdUTZYIVOt1Ex//jAQi+wTZZpUpwBj/0QsOzqegb3rGMMeJiSEu5xLHnYfBrRV4RH2+OCSOO95Is/7x1WJ4bw==} @@ -1638,7 +1631,7 @@ packages: lru-queue: 0.1.0 next-tick: 1.1.0 timers-ext: 0.1.7 - dev: false + dev: true /minimatch@3.1.2: resolution: {integrity: sha512-J7p63hRiAjw1NDEww1W7i37+ByIrOWO5XQQAzZ3VOcL0PNybwpfmV/N05zFAzwQ9USyEcX6t3UO+K5aqBQOIHw==} @@ -1688,7 +1681,7 @@ packages: /mri@1.2.0: resolution: {integrity: sha512-tzzskb3bG8LvYGFF/mDTpq3jpI6Q9wc3LEmBaghu+DdCssd1FakN7Bc0hVNmEyGq1bq3RgfkCb3cmQLpNPOroA==} engines: {node: '>=4'} - dev: false + dev: true /ms@2.1.2: resolution: {integrity: sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==} @@ -1698,14 +1691,8 @@ packages: resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==} dev: true - /multiformats@12.1.3: - resolution: {integrity: sha512-eajQ/ZH7qXZQR2AgtfpmSMizQzmyYVmCql7pdhldPuYQi4atACekbJaQplk6dWyIi10jCaFnd6pqvcEFXjbaJw==} - engines: {node: '>=16.0.0', npm: '>=7.0.0'} - dev: false - - /multiformats@13.0.1: - resolution: {integrity: sha512-bt3R5iXe2O8xpp3wkmQhC73b/lC4S2ihU8Dndwcsysqbydqb8N+bpP116qMcClZ17g58iSIwtXUTcg2zT4sniA==} - dev: false + /multiformats@13.1.0: + resolution: {integrity: sha512-HzdtdBwxsIkzpeXzhQ5mAhhuxcHbjEHH+JQoxt7hG/2HGFjjwyolLo7hbaexcnhoEuV4e0TNJ8kkpMjiEYY4VQ==} /nanoid@3.3.3: resolution: {integrity: sha512-p1sjXuopFs0xg+fPASzQ28agW1oHD7xDsd9Xkf3T15H3c/cifrFHVwrh74PdoklAPi+i7MdRsE47vm2r6JoB+w==} @@ -1725,7 +1712,7 @@ packages: /next-tick@1.1.0: resolution: {integrity: sha512-CXdUiJembsNjuToQvxayPZF9Vqht7hewsvy2sOWafLvi2awflj9mOC6bHIg50orX8IJvWKY9wYQ/zB2kogPslQ==} - dev: false + dev: true /normalize-path@3.0.0: resolution: {integrity: sha512-6eZs5Ls3WtCisHWp9S2GUy8dqkpGi4BVSz3GaqiE6ezub0512ESztXUwUB6C6IKbQkY2Pnb/mD4WYojCRwcwLA==} @@ -2023,7 +2010,7 @@ packages: engines: {node: '>=6'} dependencies: mri: 1.2.0 - dev: false + dev: true /safe-array-concat@1.0.1: resolution: {integrity: sha512-6XbUAseYE2KtOuGueyeobCySj9L4+66Tn6KQMOPQJrAJEowYKW/YR/MGJZl7FdydUdaFu4LYyDZjxf4/Nmo23Q==} @@ -2243,7 +2230,7 @@ packages: dependencies: es5-ext: 0.10.62 next-tick: 1.1.0 - dev: false + dev: true /to-regex-range@5.0.1: resolution: {integrity: sha512-65P7iz6X5yEr1cwcgvQxbbIw7Uk3gOy5dIdtZ4rDveLqhrdJP+Li/Hx6tyK0NEb+2GCyneCMJiGqrADCSNk8sQ==} @@ -2280,11 +2267,11 @@ packages: /type@1.2.0: resolution: {integrity: sha512-+5nt5AAniqsCnu2cEQQdpzCAh33kVx8n0VoFidKpB1dVVLAN/F+bgVOqOJqOnEnrhp222clB5p3vUlD+1QAnfg==} - dev: false + dev: true /type@2.7.2: resolution: {integrity: sha512-dzlvlNlt6AXU7EBSfpAscydQ7gXB+pPGsPnfJnZpiNJBDj7IaJzQlBZYGdEi4R9HmPdBv2XmWJ6YUtoTa7lmCw==} - dev: false + dev: true /typed-array-buffer@1.0.0: resolution: {integrity: sha512-Y8KTSIglk9OZEr8zywiIHG/kmQ7KWyjseXs1CbSo8vC42w7hg2HgYTxSWwP0+is7bWDc1H+Fo026CpHFwm8tkw==} @@ -2330,6 +2317,18 @@ packages: hasBin: true dev: true + /uint8arraylist@2.4.8: + resolution: {integrity: sha512-vc1PlGOzglLF0eae1M8mLRTBivsvrGsdmJ5RbK3e+QRvRLOZfZhQROTwH/OfyF3+ZVUg9/8hE8bmKP2CvP9quQ==} + dependencies: + uint8arrays: 5.0.3 + dev: true + + /uint8arrays@5.0.3: + resolution: {integrity: sha512-6LBuKji28kHjgPJMkQ6GDaBb1lRwIhyOYq6pDGwYMoDPfImE9SkuYENVmR0yu9yGgs2clHUSY9fKDukR+AXfqQ==} + dependencies: + multiformats: 13.1.0 + dev: true + /unbox-primitive@1.0.2: resolution: {integrity: sha512-61pPlCD9h51VoreyJ0BReideM3MDKMKnh6+V9L08331ipq6Q8OFXZYiqP6n/tbHx4s5I9uRhcye6BrbkizkBDw==} dependencies: @@ -2354,10 +2353,6 @@ packages: convert-source-map: 2.0.0 dev: true - /varint@6.0.0: - resolution: {integrity: sha512-cXEIW6cfr15lFv563k4GuVuW/fiwjknytD37jIOLSdSWuOI6WnO/oKwmP2FQTU2l01LP8/M5TSAJpzUaGe3uWg==} - dev: false - /version-guard@1.1.1: resolution: {integrity: sha512-MGQLX89UxmYHgDvcXyjBI0cbmoW+t/dANDppNPrno64rYr8nH4SHSuElQuSYdXGEs0mUzdQe1BY+FhVPNsAmJQ==} engines: {node: '>=0.10.48'} diff --git a/src/api.ts b/src/api.ts index 8748b15f..5b401a71 100644 --- a/src/api.ts +++ b/src/api.ts @@ -23,9 +23,7 @@ export interface Shard extends ShardConfig { export type ShardLink = Link -export interface ShardBlockView extends BlockView { - prefix: string -} +export interface ShardBlockView extends BlockView {} export interface ShardDiff { additions: ShardBlockView[] @@ -38,10 +36,55 @@ export interface BlockFetcher { } export interface ShardConfig { - /** Max encoded shard size in bytes - default 512 KiB. */ - maxSize: number - /** Max key length (in UTF-8 encoded characters) - default 64. */ - maxKeyLength: number + /** Shard compatibility version. */ + version: number + /** + * Characters allowed in keys, referring to a known character set. + * e.g. "ascii" refers to the printable ASCII characters in the code range 32-126. + */ + keyChars: string + /** Max key size in bytes - default 4096 bytes. */ + maxKeySize: number + /** The key prefix from the root to this shard. */ + prefix: string } export type ShardOptions = Partial + +export interface KeyPrefixOption { + /** Filter results to entries with keys prefixed with this string. */ + prefix: string +} + +export type KeyRangeOption = + | KeyLowerBoundRangeOption + | KeyUpperBoundRangeOption + | (KeyLowerBoundRangeOption & KeyUpperBoundRangeOption) + +export type KeyLowerBoundRangeOption = + | KeyLowerBoundRangeExclusiveOption + | KeyLowerBoundRangeInclusiveOption + +export interface KeyLowerBoundRangeExclusiveOption { + gt: string +} + +export interface KeyLowerBoundRangeInclusiveOption { + gte: string +} + +export type KeyUpperBoundRangeOption = + | KeyUpperBoundRangeExclusiveOption + | KeyUpperBoundRangeInclusiveOption + +export interface KeyUpperBoundRangeExclusiveOption { + lt: string +} + +export interface KeyUpperBoundRangeInclusiveOption { + lte: string +} + +export type EntriesOptions = + | KeyPrefixOption + | KeyRangeOption diff --git a/src/batch/api.ts b/src/batch/api.ts index 8051c537..9a1db884 100644 --- a/src/batch/api.ts +++ b/src/batch/api.ts @@ -28,13 +28,11 @@ export { export interface BatcherShard extends ShardConfig { base?: ShardBlockView - prefix: string entries: BatcherShardEntry[] } export interface BatcherShardInit extends ShardOptions { base?: ShardBlockView - prefix?: string entries?: BatcherShardEntry[] } diff --git a/src/batch/index.js b/src/batch/index.js index 806bc03c..d0b37260 100644 --- a/src/batch/index.js +++ b/src/batch/index.js @@ -1,6 +1,6 @@ // eslint-disable-next-line no-unused-vars import * as API from './api.js' -import { ShardFetcher } from '../shard.js' +import { ShardFetcher, isPrintableASCII } from '../shard.js' import * as Shard from '../shard.js' import * as BatcherShard from './shard.js' @@ -13,17 +13,19 @@ class Batcher { * @param {API.BlockFetcher} init.blocks Block storage. * @param {API.BatcherShardEntry[]} init.entries The entries in this shard. * @param {string} init.prefix Key prefix. - * @param {number} init.maxSize - * @param {number} init.maxKeyLength + * @param {number} init.version Shard compatibility version. + * @param {string} init.keyChars Characters allowed in keys, referring to a known character set. + * @param {number} init.maxKeySize Max key size in bytes. * @param {API.ShardBlockView} init.base Original shard this batcher is based on. */ - constructor ({ blocks, entries, prefix, maxSize, maxKeyLength, base }) { + constructor ({ blocks, entries, prefix, version, keyChars, maxKeySize, base }) { this.blocks = blocks this.prefix = prefix this.entries = [...entries] this.base = base - this.maxSize = maxSize - this.maxKeyLength = maxKeyLength + this.version = version + this.keyChars = keyChars + this.maxKeySize = maxKeySize } /** @@ -46,12 +48,11 @@ class Batcher { * @param {object} init * @param {API.BlockFetcher} init.blocks Block storage. * @param {API.ShardLink} init.link CID of the shard block. - * @param {string} init.prefix */ - static async create ({ blocks, link, prefix }) { + static async create ({ blocks, link }) { const shards = new ShardFetcher(blocks) const base = await shards.get(link) - return new Batcher({ blocks, prefix, base, ...base.value }) + return new Batcher({ blocks, base, ...base.value }) } } @@ -63,95 +64,105 @@ class Batcher { * @returns {Promise} */ export const put = async (blocks, shard, key, value) => { + if (shard.keyChars !== Shard.KeyCharsASCII) { + throw new Error(`unsupported key character set: ${shard.keyChars}`) + } + if (!isPrintableASCII(key)) { + throw new Error('key contains non-ASCII characters') + } + // ensure utf8 encoded key is smaller than max + if (new TextEncoder().encode(key).length > shard.maxKeySize) { + throw new Error(`UTF-8 encoded key exceeds max size of ${shard.maxKeySize} bytes`) + } + const shards = new ShardFetcher(blocks) - const dest = await traverse(shards, key, shard) + const dest = await traverse(shards, shard, key) if (dest.shard !== shard) { shard = dest.shard key = dest.key } /** @type {API.BatcherShardEntry} */ - let entry = [key, value] - /** @type {API.BatcherShard|undefined} */ - let batcher - - // if the key in this shard is longer than allowed, then we need to make some - // intermediate shards. - if (key.length > shard.maxKeyLength) { - const pfxskeys = Array.from(Array(Math.ceil(key.length / shard.maxKeyLength)), (_, i) => { - const start = i * shard.maxKeyLength - return { - prefix: shard.prefix + key.slice(0, start), - key: key.slice(start, start + shard.maxKeyLength) - } - }) - - entry = [pfxskeys[pfxskeys.length - 1].key, value] - batcher = BatcherShard.create({ - entries: [entry], - prefix: pfxskeys[pfxskeys.length - 1].prefix, - ...Shard.configure(shard) - }) - - for (let i = pfxskeys.length - 2; i > 0; i--) { - entry = [pfxskeys[i].key, [batcher]] - batcher = BatcherShard.create({ - entries: [entry], - prefix: pfxskeys[i].prefix, - ...Shard.configure(shard) - }) + let entry = [dest.key, value] + let targetEntries = [...dest.shard.entries] + + for (const [i, e] of targetEntries.entries()) { + const [k, v] = e + + // is this just a replace? + if (k === dest.key) break + + // do we need to shard this entry? + const shortest = k.length < dest.key.length ? k : dest.key + const other = shortest === k ? dest.key : k + let common = '' + for (const char of shortest) { + const next = common + char + if (!other.startsWith(next)) break + common = next } + if (common.length) { + /** @type {API.ShardEntry[]} */ + let entries = [] - entry = [pfxskeys[0].key, [batcher]] - } + // if the existing entry key or new key is equal to the common prefix, + // then the existing value / new value needs to persist in the parent + // shard. Otherwise they persist in this new shard. + if (common !== dest.key) { + entries = Shard.putEntry(entries, [dest.key.slice(common.length), value]) + } + if (common !== k) { + entries = Shard.putEntry(entries, asShardEntry([k.slice(common.length), v])) + } - shard.entries = Shard.putEntry(asShardEntries(shard.entries), asShardEntry(entry)) - - // TODO: adjust size automatically - const size = BatcherShard.encodedLength(shard) - if (size > shard.maxSize) { - const common = Shard.findCommonPrefix( - asShardEntries(shard.entries), - entry[0] - ) - if (!common) throw new Error('shard limit reached') - const { prefix } = common - /** @type {API.BatcherShardEntry[]} */ - const matches = common.matches - - const entries = matches - .filter(m => m[0] !== prefix) - .map(m => { - m = [...m] - m[0] = m[0].slice(prefix.length) - return m + let child = BatcherShard.create({ + ...Shard.configure(dest.shard), + prefix: dest.shard.prefix + common, + entries }) + + // need to spread as access by index does not consider utf-16 surrogates + const commonChars = [...common] - const batcher = BatcherShard.create({ - entries, - prefix: shard.prefix + prefix, - ...Shard.configure(shard) - }) - - /** @type {API.ShardEntryShardValue | API.ShardEntryShardAndValueValue} */ - let value - const pfxmatch = matches.find(m => m[0] === prefix) - if (pfxmatch) { - if (Array.isArray(pfxmatch[1])) { - // should not happen! all entries with this prefix should have been - // placed within this shard already. - throw new Error(`expected "${prefix}" to be a shard value but found a shard link`) + // create parent shards for each character of the common prefix + for (let i = commonChars.length - 1; i > 0; i--) { + /** @type {API.ShardEntryShardValue | API.ShardEntryShardAndValueValue} */ + let parentValue + // if the first iteration and the existing entry key is equal to the + // common prefix, then existing value needs to persist in this parent + if (i === commonChars.length - 1 && common === k) { + if (Array.isArray(v)) throw new Error('found a shard link when expecting a value') + parentValue = [child, v] + } else if (i === commonChars.length - 1 && common === dest.key) { + parentValue = [child, value] + } else { + parentValue = [child] + } + const parent = BatcherShard.create({ + ...Shard.configure(dest.shard), + prefix: dest.shard.prefix + commonChars.slice(0, i).join(''), + entries: [[commonChars[i], parentValue]] + }) + child = parent } - value = [batcher, pfxmatch[1]] - } else { - value = [batcher] - } - shard.entries = Shard.putEntry( - asShardEntries(shard.entries.filter(e => matches.every(m => e[0] !== m[0]))), - asShardEntry([prefix, value]) - ) + // remove the sharded entry + targetEntries.splice(i, 1) + + // create the entry that will be added to target + if (commonChars.length === 1 && common === k) { + if (Array.isArray(v)) throw new Error('found a shard link when expecting a value') + entry = [commonChars[0], [child, v]] + } else if (commonChars.length === 1 && common === dest.key) { + entry = [commonChars[0], [child, value]] + } else { + entry = [commonChars[0], [child]] + } + break + } } + + shard.entries = Shard.putEntry(asShardEntries(targetEntries), asShardEntry(entry)) } /** @@ -159,22 +170,22 @@ export const put = async (blocks, shard, key, value) => { * key. * * @param {ShardFetcher} shards - * @param {string} key * @param {API.BatcherShard} shard + * @param {string} key * @returns {Promise<{ shard: API.BatcherShard, key: string }>} */ -export const traverse = async (shards, key, shard) => { +export const traverse = async (shards, shard, key) => { for (let i = 0; i < shard.entries.length; i++) { const [k, v] = shard.entries[i] if (key <= k) break if (key.startsWith(k) && Array.isArray(v)) { if (Shard.isShardLink(v[0])) { - const blk = await shards.get(v[0], shard.prefix + k) - const batcher = BatcherShard.create({ base: blk, prefix: blk.prefix, ...blk.value }) + const blk = await shards.get(v[0]) + const batcher = BatcherShard.create({ base: blk, ...blk.value }) shard.entries[i] = [k, v[1] == null ? [batcher] : [batcher, v[1]]] - return traverse(shards, key.slice(k.length), batcher) + return traverse(shards, batcher, key.slice(k.length)) } - return traverse(shards, key.slice(k.length), v[0]) + return traverse(shards, v[0], key.slice(k.length)) } } return { shard, key } @@ -208,7 +219,7 @@ export const commit = async shard => { } } - const block = await Shard.encodeBlock(Shard.withEntries(entries, shard), shard.prefix) + const block = await Shard.encodeBlock(Shard.withEntries(entries, shard)) additions.push(block) if (shard.base && shard.base.cid.toString() === block.cid.toString()) { @@ -231,7 +242,7 @@ const asShardEntry = entry => /** @type {API.ShardEntry} */ (entry) * @param {API.ShardLink} root CID of the root shard block. * @returns {Promise} */ -export const create = (blocks, root) => Batcher.create({ blocks, link: root, prefix: '' }) +export const create = (blocks, root) => Batcher.create({ blocks, link: root }) export class BatchCommittedError extends Error { /** diff --git a/src/batch/shard.js b/src/batch/shard.js index bb19d709..d90aea22 100644 --- a/src/batch/shard.js +++ b/src/batch/shard.js @@ -1,65 +1,13 @@ // eslint-disable-next-line no-unused-vars -import * as Link from 'multiformats/link' -import { tokensToLength } from 'cborg/length' -import { Token, Type } from 'cborg' import * as API from './api.js' import { configure } from '../shard.js' -/** Byte length of a v1, dag-cbor, sha-256 CID */ -const ShardLinkByteLength = 36 - -const CID_TAG = new Token(Type.tag, 42) - /** * @param {API.BatcherShardInit} [init] * @returns {API.BatcherShard} */ export const create = init => ({ base: init?.base, - prefix: init?.prefix ?? '', entries: [...init?.entries ?? []], ...configure(init) }) - -/** @param {API.BatcherShard} shard */ -export const encodedLength = (shard) => { - let entriesLength = 0 - for (const entry of shard.entries) { - entriesLength += entryEncodedLength(entry) - } - const tokens = [ - new Token(Type.map, 3), - new Token(Type.string, 'entries'), - new Token(Type.array, shard.entries.length), - new Token(Type.string, 'maxKeyLength'), - new Token(Type.uint, shard.maxKeyLength), - new Token(Type.string, 'maxSize'), - new Token(Type.uint, shard.maxSize) - ] - return tokensToLength(tokens) + entriesLength -} - -/** @param {API.BatcherShardEntry} entry */ -const entryEncodedLength = entry => { - const tokens = [ - new Token(Type.array, entry.length), - new Token(Type.string, entry[0]) - ] - if (Array.isArray(entry[1])) { - tokens.push(new Token(Type.array, entry[1].length)) - for (const item of entry[1]) { - tokens.push(CID_TAG) - if (Link.isLink(item)) { - tokens.push(new Token(Type.bytes, { length: item.byteLength + 1 })) - } else { - // `item is BatcherShard and does not have a CID yet, however, when it - // does, it will be this long. - tokens.push(new Token(Type.bytes, { length: ShardLinkByteLength + 1 })) - } - } - } else { - tokens.push(CID_TAG) - tokens.push(new Token(Type.bytes, { length: entry[1].byteLength + 1 })) - } - return tokensToLength(tokens) -} diff --git a/src/crdt/api.ts b/src/crdt/api.ts index c8b25397..b3783df7 100644 --- a/src/crdt/api.ts +++ b/src/crdt/api.ts @@ -1,7 +1,7 @@ import { ShardDiff, ShardLink, UnknownLink } from '../api.js' import { EventLink, EventBlockView } from '../clock/api.js' -export { BlockFetcher, UnknownLink, ShardBlockView, ShardDiff, ShardLink } from '../api.js' +export { BlockFetcher, UnknownLink, ShardBlockView, ShardDiff, ShardLink, EntriesOptions } from '../api.js' export { EventBlockView, EventLink } from '../clock/api.js' export interface Result extends ShardDiff { diff --git a/src/crdt/batch/api.ts b/src/crdt/batch/api.ts index 52d1cfc7..8f93d4e8 100644 --- a/src/crdt/batch/api.ts +++ b/src/crdt/batch/api.ts @@ -1,7 +1,6 @@ import { Batcher, BatcherShardEntry, - ShardDiff, ShardBlockView, BlockFetcher, ShardLink, diff --git a/src/crdt/batch/index.js b/src/crdt/batch/index.js index 31ecd8e3..7412fd94 100644 --- a/src/crdt/batch/index.js +++ b/src/crdt/batch/index.js @@ -21,20 +21,22 @@ class Batcher { * @param {API.EventLink[]} init.head Merkle clock head. * @param {API.BatcherShardEntry[]} init.entries The entries in this shard. * @param {string} init.prefix Key prefix. - * @param {number} init.maxSize - * @param {number} init.maxKeyLength + * @param {number} init.version Shard compatibility version. + * @param {string} init.keyChars Characters allowed in keys, referring to a known character set. + * @param {number} init.maxKeySize Max key size in bytes. * @param {API.ShardBlockView} init.base Original shard this batcher is based on. * @param {API.ShardBlockView[]} init.additions Additions to include in the committed batch. * @param {API.ShardBlockView[]} init.removals Removals to include in the committed batch. */ - constructor ({ blocks, head, entries, prefix, maxSize, maxKeyLength, base, additions, removals }) { + constructor ({ blocks, head, entries, prefix, version, keyChars, maxKeySize, base, additions, removals }) { this.blocks = blocks this.head = head this.prefix = prefix this.entries = [...entries] this.base = base - this.maxSize = maxSize - this.maxKeyLength = maxKeyLength + this.version = version + this.keyChars = keyChars + this.maxKeySize = maxKeySize this.additions = additions this.removals = removals /** @type {API.BatchOperation['ops']} */ @@ -107,9 +109,8 @@ class Batcher { * @param {object} init * @param {API.BlockFetcher} init.blocks Block storage. * @param {API.EventLink[]} init.head Merkle clock head. - * @param {string} init.prefix */ - static async create ({ blocks, head, prefix }) { + static async create ({ blocks, head }) { const mblocks = new MemoryBlockstore() blocks = new MultiBlockFetcher(mblocks, blocks) @@ -120,7 +121,6 @@ class Batcher { blocks, head, entries: [], - prefix, base, additions: [base], removals: [], @@ -139,7 +139,6 @@ class Batcher { blocks, head, entries: base.value.entries, - prefix, base, additions, removals, @@ -153,4 +152,4 @@ class Batcher { * @param {API.EventLink[]} head Merkle clock head. * @returns {Promise} */ -export const create = (blocks, head) => Batcher.create({ blocks, head, prefix: '' }) +export const create = (blocks, head) => Batcher.create({ blocks, head }) diff --git a/src/crdt/index.js b/src/crdt/index.js index fa4e28de..e5b8ded9 100644 --- a/src/crdt/index.js +++ b/src/crdt/index.js @@ -228,8 +228,7 @@ export const get = async (blocks, head, key) => { /** * @param {API.BlockFetcher} blocks Bucket block storage. * @param {API.EventLink[]} head Merkle clock head. - * @param {object} [options] - * @param {string} [options.prefix] + * @param {API.EntriesOptions} [options] */ export const entries = async function * (blocks, head, options) { if (!head.length) return diff --git a/src/diff.js b/src/diff.js index 15b07eec..bd9dd456 100644 --- a/src/diff.js +++ b/src/diff.js @@ -18,11 +18,11 @@ import { ShardFetcher } from './shard.js' * @param {API.ShardLink} b Comparison DAG. * @returns {Promise} */ -export const difference = async (blocks, a, b, prefix = '') => { +export const difference = async (blocks, a, b) => { if (isEqual(a, b)) return { keys: [], shards: { additions: [], removals: [] } } const shards = new ShardFetcher(blocks) - const [ashard, bshard] = await Promise.all([shards.get(a, prefix), shards.get(b, prefix)]) + const [ashard, bshard] = await Promise.all([shards.get(a), shards.get(b)]) const aents = new Map(ashard.value.entries) const bents = new Map(bshard.value.entries) @@ -36,19 +36,19 @@ export const difference = async (blocks, a, b, prefix = '') => { const bval = bents.get(akey) if (bval) continue if (!Array.isArray(aval)) { - keys.set(`${ashard.prefix}${akey}`, [aval, null]) + keys.set(`${ashard.value.prefix}${akey}`, [aval, null]) continue } // if shard link _with_ value if (aval[1] != null) { - keys.set(`${ashard.prefix}${akey}`, [aval[1], null]) + keys.set(`${ashard.value.prefix}${akey}`, [aval[1], null]) } - for await (const s of collect(shards, aval[0], `${ashard.prefix}${akey}`)) { + for await (const s of collect(shards, aval[0])) { for (const [k, v] of s.value.entries) { if (!Array.isArray(v)) { - keys.set(`${s.prefix}${k}`, [v, null]) + keys.set(`${s.value.prefix}${k}`, [v, null]) } else if (v[1] != null) { - keys.set(`${s.prefix}${k}`, [v[1], null]) + keys.set(`${s.value.prefix}${k}`, [v[1], null]) } } removals.set(s.cid.toString(), s) @@ -60,22 +60,22 @@ export const difference = async (blocks, a, b, prefix = '') => { const aval = aents.get(bkey) if (!Array.isArray(bval)) { if (!aval) { - keys.set(`${bshard.prefix}${bkey}`, [null, bval]) + keys.set(`${bshard.value.prefix}${bkey}`, [null, bval]) } else if (Array.isArray(aval)) { - keys.set(`${bshard.prefix}${bkey}`, [aval[1] ?? null, bval]) + keys.set(`${bshard.value.prefix}${bkey}`, [aval[1] ?? null, bval]) } else if (!isEqual(aval, bval)) { - keys.set(`${bshard.prefix}${bkey}`, [aval, bval]) + keys.set(`${bshard.value.prefix}${bkey}`, [aval, bval]) } continue } if (aval && Array.isArray(aval)) { // updated in B if (isEqual(aval[0], bval[0])) { if (bval[1] != null && (aval[1] == null || !isEqual(aval[1], bval[1]))) { - keys.set(`${bshard.prefix}${bkey}`, [aval[1] ?? null, bval[1]]) + keys.set(`${bshard.value.prefix}${bkey}`, [aval[1] ?? null, bval[1]]) } continue // updated value? } - const res = await difference(blocks, aval[0], bval[0], `${bshard.prefix}${bkey}`) + const res = await difference(blocks, aval[0], bval[0]) for (const shard of res.shards.additions) { additions.set(shard.cid.toString(), shard) } @@ -87,28 +87,28 @@ export const difference = async (blocks, a, b, prefix = '') => { } } else if (aval) { // updated in B value => link+value if (bval[1] == null) { - keys.set(`${bshard.prefix}${bkey}`, [aval, null]) + keys.set(`${bshard.value.prefix}${bkey}`, [aval, null]) } else if (!isEqual(aval, bval[1])) { - keys.set(`${bshard.prefix}${bkey}`, [aval, bval[1]]) + keys.set(`${bshard.value.prefix}${bkey}`, [aval, bval[1]]) } - for await (const s of collect(shards, bval[0], `${bshard.prefix}${bkey}`)) { + for await (const s of collect(shards, bval[0])) { for (const [k, v] of s.value.entries) { if (!Array.isArray(v)) { - keys.set(`${s.prefix}${k}`, [null, v]) + keys.set(`${s.value.prefix}${k}`, [null, v]) } else if (v[1] != null) { - keys.set(`${s.prefix}${k}`, [null, v[1]]) + keys.set(`${s.value.prefix}${k}`, [null, v[1]]) } } additions.set(s.cid.toString(), s) } } else { // added in B - keys.set(`${bshard.prefix}${bkey}`, [null, bval[0]]) - for await (const s of collect(shards, bval[0], `${bshard.prefix}${bkey}`)) { + keys.set(`${bshard.value.prefix}${bkey}`, [null, bval[0]]) + for await (const s of collect(shards, bval[0])) { for (const [k, v] of s.value.entries) { if (!Array.isArray(v)) { - keys.set(`${s.prefix}${k}`, [null, v]) + keys.set(`${s.value.prefix}${k}`, [null, v]) } else if (v[1] != null) { - keys.set(`${s.prefix}${k}`, [null, v[1]]) + keys.set(`${s.value.prefix}${k}`, [null, v[1]]) } } additions.set(s.cid.toString(), s) @@ -141,11 +141,11 @@ const isEqual = (a, b) => a.toString() === b.toString() * @param {API.ShardLink} root * @returns {AsyncIterableIterator} */ -async function * collect (shards, root, prefix = '') { - const shard = await shards.get(root, prefix) +async function * collect (shards, root) { + const shard = await shards.get(root) yield shard - for (const [k, v] of shard.value.entries) { + for (const [, v] of shard.value.entries) { if (!Array.isArray(v)) continue - yield * collect(shards, v[0], `${prefix}${k}`) + yield * collect(shards, v[0]) } } diff --git a/src/index.js b/src/index.js index 0c199f48..c98cf2cc 100644 --- a/src/index.js +++ b/src/index.js @@ -1,6 +1,6 @@ // eslint-disable-next-line no-unused-vars import * as API from './api.js' -import { ShardFetcher } from './shard.js' +import { ShardFetcher, isPrintableASCII } from './shard.js' import * as Shard from './shard.js' /** @@ -16,81 +16,105 @@ import * as Shard from './shard.js' export const put = async (blocks, root, key, value) => { const shards = new ShardFetcher(blocks) const rshard = await shards.get(root) + + if (rshard.value.keyChars !== Shard.KeyCharsASCII) { + throw new Error(`unsupported key character set: ${rshard.value.keyChars}`) + } + if (!isPrintableASCII(key)) { + throw new Error('key contains non-ASCII characters') + } + // ensure utf8 encoded key is smaller than max + if (new TextEncoder().encode(key).length > rshard.value.maxKeySize) { + throw new Error(`UTF-8 encoded key exceeds max size of ${rshard.value.maxKeySize} bytes`) + } + const path = await traverse(shards, rshard, key) const target = path[path.length - 1] - const skey = key.slice(target.prefix.length) // key within the shard + const skey = key.slice(target.value.prefix.length) // key within the shard /** @type {API.ShardEntry} */ let entry = [skey, value] + let targetEntries = [...target.value.entries] /** @type {API.ShardBlockView[]} */ const additions = [] - // if the key in this shard is longer than allowed, then we need to make some - // intermediate shards. - if (skey.length > target.value.maxKeyLength) { - const pfxskeys = Array.from(Array(Math.ceil(skey.length / target.value.maxKeyLength)), (_, i) => { - const start = i * target.value.maxKeyLength - return { - prefix: target.prefix + skey.slice(0, start), - skey: skey.slice(start, start + target.value.maxKeyLength) - } - }) + for (const [i, e] of targetEntries.entries()) { + const [k, v] = e - let child = await Shard.encodeBlock( - Shard.withEntries([[pfxskeys[pfxskeys.length - 1].skey, value]], target.value), - pfxskeys[pfxskeys.length - 1].prefix - ) - additions.push(child) + // is this just a replace? + if (k === skey) break - for (let i = pfxskeys.length - 2; i > 0; i--) { - child = await Shard.encodeBlock( - Shard.withEntries([[pfxskeys[i].skey, [child.cid]]], target.value), - pfxskeys[i].prefix - ) - additions.push(child) + // do we need to shard this entry? + const shortest = k.length < skey.length ? k : skey + const other = shortest === k ? skey : k + let common = '' + for (const char of shortest) { + const next = common + char + if (!other.startsWith(next)) break + common = next } + if (common.length) { + /** @type {API.ShardEntry[]} */ + let entries = [] + + // if the existing entry key or new key is equal to the common prefix, + // then the existing value / new value needs to persist in the parent + // shard. Otherwise they persist in this new shard. + if (common !== skey) { + entries = Shard.putEntry(entries, [skey.slice(common.length), value]) + } + if (common !== k) { + entries = Shard.putEntry(entries, [k.slice(common.length), v]) + } - entry = [pfxskeys[0].skey, [child.cid]] - } + let child = await Shard.encodeBlock( + Shard.withEntries(entries, { ...target.value, prefix: target.value.prefix + common }) + ) + additions.push(child) + + // need to spread as access by index does not consider utf-16 surrogates + const commonChars = [...common] + + // create parent shards for each character of the common prefix + for (let i = commonChars.length - 1; i > 0; i--) { + const parentConfig = { ...target.value, prefix: target.value.prefix + commonChars.slice(0, i).join('') } + /** @type {API.ShardEntryLinkValue | API.ShardEntryValueValue | API.ShardEntryLinkAndValueValue} */ + let parentValue + // if the first iteration and the existing entry key is equal to the + // common prefix, then existing value needs to persist in this parent + if (i === commonChars.length - 1 && common === k) { + if (Array.isArray(v)) throw new Error('found a shard link when expecting a value') + parentValue = [child.cid, v] + } else if (i === commonChars.length - 1 && common === skey) { + parentValue = [child.cid, value] + } else { + parentValue = [child.cid] + } + const parent = await Shard.encodeBlock(Shard.withEntries([[commonChars[i], parentValue]], parentConfig)) + additions.push(parent) + child = parent + } - let shard = Shard.withEntries(Shard.putEntry(target.value.entries, entry), target.value) - let child = await Shard.encodeBlock(shard, target.prefix) - - if (child.bytes.length > shard.maxSize) { - const common = Shard.findCommonPrefix(shard.entries, entry[0]) - if (!common) throw new Error('shard limit reached') - const { prefix, matches } = common - const block = await Shard.encodeBlock( - Shard.withEntries( - matches - .filter(([k]) => k !== prefix) - .map(([k, v]) => [k.slice(prefix.length), v]), - shard - ), - target.prefix + prefix - ) - additions.push(block) - - /** @type {API.ShardEntryLinkValue | API.ShardEntryLinkAndValueValue} */ - let value - const pfxmatch = matches.find(([k]) => k === prefix) - if (pfxmatch) { - if (Array.isArray(pfxmatch[1])) { - // should not happen! all entries with this prefix should have been - // placed within this shard already. - throw new Error(`expected "${prefix}" to be a shard value but found a shard link`) + // remove the sharded entry + targetEntries.splice(i, 1) + + // create the entry that will be added to target + if (commonChars.length === 1 && common === k) { + if (Array.isArray(v)) throw new Error('found a shard link when expecting a value') + entry = [commonChars[0], [child.cid, v]] + } else if (commonChars.length === 1 && common === skey) { + entry = [commonChars[0], [child.cid, value]] + } else { + entry = [commonChars[0], [child.cid]] } - value = [block.cid, pfxmatch[1]] - } else { - value = [block.cid] + break } - - shard.entries = shard.entries.filter(e => matches.every(m => e[0] !== m[0])) - shard = Shard.withEntries(Shard.putEntry(shard.entries, [prefix, value]), shard) - child = await Shard.encodeBlock(shard, target.prefix) } + const shard = Shard.withEntries(Shard.putEntry(targetEntries, entry), target.value) + let child = await Shard.encodeBlock(shard) + // if no change in the target then we're done if (child.cid.toString() === target.cid.toString()) { return { root, additions: [], removals: [] } @@ -98,10 +122,10 @@ export const put = async (blocks, root, key, value) => { additions.push(child) - // path is root -> shard, so work backwards, propagating the new shard CID + // path is root -> target, so work backwards, propagating the new shard CID for (let i = path.length - 2; i >= 0; i--) { const parent = path[i] - const key = child.prefix.slice(parent.prefix.length) + const key = child.value.prefix.slice(parent.value.prefix.length) const value = Shard.withEntries( parent.value.entries.map((entry) => { const [k, v] = entry @@ -112,7 +136,7 @@ export const put = async (blocks, root, key, value) => { parent.value ) - child = await Shard.encodeBlock(value, parent.prefix) + child = await Shard.encodeBlock(value) additions.push(child) } @@ -133,7 +157,7 @@ export const get = async (blocks, root, key) => { const rshard = await shards.get(root) const path = await traverse(shards, rshard, key) const target = path[path.length - 1] - const skey = key.slice(target.prefix.length) // key within the shard + const skey = key.slice(target.value.prefix.length) // key within the shard const entry = target.value.entries.find(([k]) => k === skey) if (!entry) return return Array.isArray(entry[1]) ? entry[1][1] : entry[1] @@ -153,7 +177,7 @@ export const del = async (blocks, root, key) => { const rshard = await shards.get(root) const path = await traverse(shards, rshard, key) const target = path[path.length - 1] - const skey = key.slice(target.prefix.length) // key within the shard + const skey = key.slice(target.value.prefix.length) // key within the shard const entryidx = target.value.entries.findIndex(([k]) => k === skey) if (entryidx === -1) return { root, additions: [], removals: [] } @@ -192,13 +216,13 @@ export const del = async (blocks, root, key) => { } } - let child = await Shard.encodeBlock(shard, path[path.length - 1].prefix) + let child = await Shard.encodeBlock(shard) additions.push(child) // path is root -> shard, so work backwards, propagating the new shard CID for (let i = path.length - 2; i >= 0; i--) { const parent = path[i] - const key = child.prefix.slice(parent.prefix.length) + const key = child.value.prefix.slice(parent.value.prefix.length) const value = Shard.withEntries( parent.value.entries.map((entry) => { const [k, v] = entry @@ -209,26 +233,86 @@ export const del = async (blocks, root, key) => { parent.value ) - child = await Shard.encodeBlock(value, parent.prefix) + child = await Shard.encodeBlock(value) additions.push(child) } return { root: additions[additions.length - 1].cid, additions, removals } } +/** + * @param {API.EntriesOptions} [options] + * @returns {options is API.KeyPrefixOption} + */ +const isKeyPrefixOption = options => { + const opts = options ?? {} + return 'prefix' in opts && Boolean(opts.prefix) +} + +/** + * @param {API.EntriesOptions} [options] + * @returns {options is API.KeyRangeOption} + */ +const isKeyRangeOption = options => { + const opts = options ?? {} + return ('gt' in opts && Boolean(opts.gt)) || ('gte' in opts && Boolean(opts.gte)) || ('lt' in opts && Boolean(opts.lt)) || ('lte' in opts && Boolean(opts.lte)) +} + +/** + * @param {API.KeyRangeOption} options + * @returns {options is API.KeyLowerBoundRangeOption} + */ +const isKeyLowerBoundRangeOption = options => ('gt' in options && Boolean(options.gt)) || ('gte' in options && Boolean(options.gte)) + +/** + * @param {API.KeyLowerBoundRangeOption} options + * @returns {options is API.KeyLowerBoundRangeInclusiveOption} + */ +const isKeyLowerBoundRangeInclusiveOption = options => 'gte' in options && Boolean(options.gte) + +/** + * @param {API.KeyLowerBoundRangeOption} options + * @returns {options is API.KeyLowerBoundRangeExclusiveOption} + */ +const isKeyLowerBoundRangeExclusiveOption = options => 'gt' in options && Boolean(options.gt) + +/** + * @param {API.KeyRangeOption} options + * @returns {options is API.KeyUpperBoundRangeOption} + */ +const isKeyUpperBoundRangeOption = options => ('lt' in options && Boolean(options.lt)) || ('lte' in options && Boolean(options.lte)) + +/** + * @param {API.KeyUpperBoundRangeOption} options + * @returns {options is API.KeyUpperBoundRangeInclusiveOption} + */ +const isKeyUpperBoundRangeInclusiveOption = options => 'lte' in options && Boolean(options.lte) + +/** + * @param {API.KeyUpperBoundRangeOption} options + * @returns {options is API.KeyUpperBoundRangeExclusiveOption} + */ +const isKeyUpperBoundRangeExclusiveOption = options => 'lt' in options && Boolean(options.lt) + /** * List entries in the bucket. * * @param {API.BlockFetcher} blocks Bucket block storage. * @param {API.ShardLink} root CID of the root node of the bucket. - * @param {object} [options] - * @param {string} [options.prefix] Filter results to entries with keys prefixed with this string. - * @param {string} [options.gt] Filter results to entries with keys greater than this string. - * @param {string} [options.gte] Filter results to entries with keys greater than or equal to this string. + * @param {API.EntriesOptions} [options] * @returns {AsyncIterableIterator} */ -export const entries = async function * (blocks, root, options = {}) { - const { prefix, gt, gte } = options +export const entries = async function * (blocks, root, options) { + const hasKeyPrefix = isKeyPrefixOption(options) + const hasKeyRange = isKeyRangeOption(options) + const hasKeyLowerBoundRange = hasKeyRange && isKeyLowerBoundRangeOption(options) + const hasKeyLowerBoundRangeInclusive = hasKeyLowerBoundRange && isKeyLowerBoundRangeInclusiveOption(options) + const hasKeyLowerBoundRangeExclusive = hasKeyLowerBoundRange && isKeyLowerBoundRangeExclusiveOption(options) + const hasKeyUpperBoundRange = hasKeyRange && isKeyUpperBoundRangeOption(options) + const hasKeyUpperBoundRangeInclusive = hasKeyUpperBoundRange && isKeyUpperBoundRangeInclusiveOption(options) + const hasKeyUpperBoundRangeExclusive = hasKeyUpperBoundRange && isKeyUpperBoundRangeExclusiveOption(options) + const hasKeyUpperAndLowerBoundRange = hasKeyLowerBoundRange && hasKeyUpperBoundRange + const shards = new ShardFetcher(blocks) const rshard = await shards.get(root) @@ -236,49 +320,55 @@ export const entries = async function * (blocks, root, options = {}) { /** @returns {AsyncIterableIterator} */ async function * ents (shard) { for (const entry of shard.value.entries) { - const key = shard.prefix + entry[0] + const key = shard.value.prefix + entry[0] + // if array, this is a link to a shard if (Array.isArray(entry[1])) { if (entry[1][1]) { if ( - (prefix && key.startsWith(prefix)) || - (gt && key > gt) || - (gte && key >= gte) || - (!prefix && !gt && !gte) + (hasKeyPrefix && key.startsWith(options.prefix)) || + (hasKeyUpperAndLowerBoundRange && ( + ((hasKeyLowerBoundRangeExclusive && key > options.gt) || (hasKeyLowerBoundRangeInclusive && key >= options.gte)) && + ((hasKeyUpperBoundRangeExclusive && key < options.lt) || (hasKeyUpperBoundRangeInclusive && key <= options.lte)) + )) || + (hasKeyLowerBoundRangeExclusive && key > options.gt) || + (hasKeyLowerBoundRangeInclusive && key >= options.gte) || + (hasKeyUpperBoundRangeExclusive && key < options.lt) || + (hasKeyUpperBoundRangeInclusive && key <= options.lte) || + (!hasKeyPrefix && !hasKeyRange) ) { yield [key, entry[1][1]] } } - if (prefix) { - if (prefix.length <= key.length && !key.startsWith(prefix)) { - continue - } - if (prefix.length > key.length && !prefix.startsWith(key)) { - continue - } - } else if (gt) { - if (gt.length <= key.length && !key.startsWith(gt)) { + if (hasKeyPrefix) { + if (options.prefix.length <= key.length && !key.startsWith(options.prefix)) { continue } - if (gt.length > key.length && !gt.startsWith(key)) { - continue - } - } else if (gte) { - if (gte.length <= key.length && !key.startsWith(gte)) { - continue - } - if (gte.length > key.length && !gte.startsWith(key)) { + if (options.prefix.length > key.length && !options.prefix.startsWith(key)) { continue } + } else if ( + (hasKeyLowerBoundRangeExclusive && (trunc(key, Math.min(key.length, options.gt.length)) < trunc(options.gt, Math.min(key.length, options.gt.length)))) || + (hasKeyLowerBoundRangeInclusive && (trunc(key, Math.min(key.length, options.gte.length)) < trunc(options.gte, Math.min(key.length, options.gte.length)))) || + (hasKeyUpperBoundRangeExclusive && (trunc(key, Math.min(key.length, options.lt.length)) > trunc(options.lt, Math.min(key.length, options.lt.length)))) || + (hasKeyUpperBoundRangeInclusive && (trunc(key, Math.min(key.length, options.lte.length)) > trunc(options.lte, Math.min(key.length, options.lte.length)))) + ) { + continue } - yield * ents(await shards.get(entry[1][0], key)) + yield * ents(await shards.get(entry[1][0])) } else { if ( - (prefix && key.startsWith(prefix)) || - (gt && key > gt) || - (gte && key >= gte) || - (!prefix && !gt && !gte) + (hasKeyPrefix && key.startsWith(options.prefix)) || + (hasKeyRange && hasKeyUpperAndLowerBoundRange && ( + ((hasKeyLowerBoundRangeExclusive && key > options.gt) || (hasKeyLowerBoundRangeInclusive && key >= options.gte)) && + ((hasKeyUpperBoundRangeExclusive && key < options.lt) || (hasKeyUpperBoundRangeInclusive && key <= options.lte)) + )) || + (hasKeyRange && !hasKeyUpperAndLowerBoundRange && ( + (hasKeyLowerBoundRangeExclusive && key > options.gt) || (hasKeyLowerBoundRangeInclusive && key >= options.gte) || + (hasKeyUpperBoundRangeExclusive && key < options.lt) || (hasKeyUpperBoundRangeInclusive && key <= options.lte) + )) || + (!hasKeyPrefix && !hasKeyRange) ) { yield [key, entry[1]] } @@ -288,6 +378,12 @@ export const entries = async function * (blocks, root, options = {}) { )(rshard) } +/** + * @param {string} str + * @param {number} len + */ +const trunc = (str, len) => str.length <= len ? str : str.slice(0, len) + /** * Traverse from the passed shard block to the target shard block using the * passed key. All traversed shards are returned, starting with the passed @@ -302,7 +398,7 @@ const traverse = async (shards, shard, key) => { for (const [k, v] of shard.value.entries) { if (key === k) return [shard] if (key.startsWith(k) && Array.isArray(v)) { - const path = await traverse(shards, await shards.get(v[0], shard.prefix + k), key.slice(k.length)) + const path = await traverse(shards, await shards.get(v[0]), key.slice(k.length)) return [shard, ...path] } } diff --git a/src/shard.js b/src/shard.js index b6f0589c..0796d8e3 100644 --- a/src/shard.js +++ b/src/shard.js @@ -2,15 +2,11 @@ import * as Link from 'multiformats/link' import { Block, encode, decode } from 'multiformats/block' import { sha256 } from 'multiformats/hashes/sha2' import * as dagCBOR from '@ipld/dag-cbor' -import { tokensToLength } from 'cborg/length' -import { Token, Type } from 'cborg' // eslint-disable-next-line no-unused-vars import * as API from './api.js' -export const MaxKeyLength = 64 -export const MaxShardSize = 512 * 1024 - -const CID_TAG = new Token(Type.tag, 42) +export const KeyCharsASCII = 'ascii' +export const MaxKeySize = 4096 /** * @extends {Block} @@ -22,12 +18,10 @@ export class ShardBlock extends Block { * @param {API.ShardLink} config.cid * @param {API.Shard} config.value * @param {Uint8Array} config.bytes - * @param {string} config.prefix */ - constructor ({ cid, value, bytes, prefix }) { + constructor ({ cid, value, bytes }) { // @ts-expect-error super({ cid, value, bytes }) - this.prefix = prefix } /** @param {API.ShardOptions} [options] */ @@ -47,8 +41,10 @@ export const create = (options) => ({ entries: [], ...configure(options) }) * @returns {API.ShardConfig} */ export const configure = (options) => ({ - maxSize: options?.maxSize ?? MaxShardSize, - maxKeyLength: options?.maxKeyLength ?? MaxKeyLength + version: 1, + keyChars: options?.keyChars ?? KeyCharsASCII, + maxKeySize: options?.maxKeySize ?? MaxKeySize, + prefix: options?.prefix ?? '' }) /** @@ -63,39 +59,39 @@ const decodeCache = new WeakMap() /** * @param {API.Shard} value - * @param {string} [prefix] * @returns {Promise} */ -export const encodeBlock = async (value, prefix) => { +export const encodeBlock = async value => { const { cid, bytes } = await encode({ value, codec: dagCBOR, hasher: sha256 }) - const block = new ShardBlock({ cid, value, bytes, prefix: prefix ?? '' }) + const block = new ShardBlock({ cid, value, bytes }) decodeCache.set(block.bytes, block) return block } /** * @param {Uint8Array} bytes - * @param {string} [prefix] * @returns {Promise} */ -export const decodeBlock = async (bytes, prefix) => { +export const decodeBlock = async bytes => { const block = decodeCache.get(bytes) if (block) return block const { cid, value } = await decode({ bytes, codec: dagCBOR, hasher: sha256 }) if (!isShard(value)) throw new Error(`invalid shard: ${cid}`) - return new ShardBlock({ cid, value, bytes, prefix: prefix ?? '' }) + return new ShardBlock({ cid, value, bytes }) } /** * @param {any} value * @returns {value is API.Shard} */ -export const isShard = (value) => +export const isShard = value => value != null && typeof value === 'object' && Array.isArray(value.entries) && - typeof value.maxSize === 'number' && - typeof value.maxKeyLength === 'number' + value.version === 1 && + typeof value.maxKeySize === 'number' && + typeof value.keyChars === 'string' && + typeof value.prefix === 'string' /** * @param {any} value @@ -113,13 +109,12 @@ export class ShardFetcher { /** * @param {API.ShardLink} link - * @param {string} [prefix] * @returns {Promise} */ - async get (link, prefix = '') { + async get (link) { const block = await this._blocks.get(link) if (!block) throw new Error(`missing block: ${link}`) - return decodeBlock(block.bytes, prefix) + return decodeBlock(block.bytes) } } @@ -177,72 +172,9 @@ export const putEntry = (target, newEntry) => { } entries.push(newEntry) - return entries -} - -/** - * @param {API.ShardEntry[]} entries - * @param {string} skey Shard key to use as a base. - */ -export const findCommonPrefix = (entries, skey) => { - const startidx = entries.findIndex(([k]) => skey === k) - if (startidx === -1) throw new Error(`key not found in shard: ${skey}`) - let i = startidx - /** @type {string} */ - let pfx - while (true) { - pfx = entries[i][0].slice(0, -1) - if (pfx.length) { - while (true) { - const matches = entries.filter(entry => entry[0].startsWith(pfx)) - if (matches.length > 1) return { prefix: pfx, matches } - pfx = pfx.slice(0, -1) - if (!pfx.length) break - } - } - i++ - if (i >= entries.length) { - i = 0 - } - if (i === startidx) { - return - } - } -} -/** @param {API.Shard} shard */ -export const encodedLength = (shard) => { - let entriesLength = 0 - for (const entry of shard.entries) { - entriesLength += entryEncodedLength(entry) - } - const tokens = [ - new Token(Type.map, 3), - new Token(Type.string, 'entries'), - new Token(Type.array, shard.entries.length), - new Token(Type.string, 'maxKeyLength'), - new Token(Type.uint, shard.maxKeyLength), - new Token(Type.string, 'maxSize'), - new Token(Type.uint, shard.maxSize) - ] - return tokensToLength(tokens) + entriesLength + return entries } -/** @param {API.ShardEntry} entry */ -const entryEncodedLength = entry => { - const tokens = [ - new Token(Type.array, entry.length), - new Token(Type.string, entry[0]) - ] - if (Array.isArray(entry[1])) { - tokens.push(new Token(Type.array, entry[1].length)) - for (const link of entry[1]) { - tokens.push(CID_TAG) - tokens.push(new Token(Type.bytes, { length: link.byteLength + 1 })) - } - } else { - tokens.push(CID_TAG) - tokens.push(new Token(Type.bytes, { length: entry[1].byteLength + 1 })) - } - return tokensToLength(tokens) -} +/** @param {string} s */ +export const isPrintableASCII = s => /^[\x20-\x7E]*$/.test(s) diff --git a/test/batch.test.js b/test/batch.test.js index e5d2625b..1a06dccd 100644 --- a/test/batch.test.js +++ b/test/batch.test.js @@ -8,8 +8,8 @@ import * as Batch from '../src/batch/index.js' import { Blockstore, randomCID, randomString, vis } from './helpers.js' describe('batch', () => { - it('batches puts (shard on key length)', async () => { - const rootblk = await ShardBlock.create({ maxKeyLength: 4 }) + it('batches puts', async () => { + const rootblk = await ShardBlock.create() const blocks = new Blockstore() await blocks.put(rootblk.cid, rootblk.bytes) @@ -41,43 +41,8 @@ describe('batch', () => { } }) - it('batches puts (shard on max size)', async () => { - const rootblk = await ShardBlock.create({ maxSize: 2000 }) - const blocks = new Blockstore() - await blocks.put(rootblk.cid, rootblk.bytes) - - const ops = [] - for (let i = 0; i < 1000; i++) { - ops.push({ type: 'put', key: `test${randomString(10)}`, value: await randomCID() }) - } - - const batch = await Batch.create(blocks, rootblk.cid) - for (const op of ops) { - await batch.put(op.key, op.value) - } - const { root, additions, removals } = await batch.commit() - - for (const b of removals) { - blocks.deleteSync(b.cid) - } - for (const b of additions) { - blocks.putSync(b.cid, b.bytes) - } - - assert.equal(removals.length, 1) - assert.equal(removals[0].cid.toString(), rootblk.cid.toString()) - - for (const o of ops) { - const value = await Pail.get(blocks, root, o.key) - assert(value) - assert.equal(value.toString(), o.value.toString()) - } - - vis(blocks, root) - }) - it('create the same DAG as non-batched puts', async () => { - const root = await ShardBlock.create({ maxKeyLength: 4 }) + const root = await ShardBlock.create() const blocks = new Blockstore() await blocks.put(root.cid, root.bytes) @@ -106,7 +71,7 @@ describe('batch', () => { }) it('error when put after commit', async () => { - const root = await ShardBlock.create({ maxKeyLength: 4 }) + const root = await ShardBlock.create() const blocks = new Blockstore() await blocks.put(root.cid, root.bytes) @@ -124,7 +89,7 @@ describe('batch', () => { }) it('error when commit after commit', async () => { - const root = await ShardBlock.create({ maxKeyLength: 4 }) + const root = await ShardBlock.create() const blocks = new Blockstore() await blocks.put(root.cid, root.bytes) @@ -142,7 +107,7 @@ describe('batch', () => { }) it('traverses existing shards to put values', async () => { - const rootblk = await ShardBlock.create({ maxKeyLength: 4 }) + const rootblk = await ShardBlock.create() const blocks = new Blockstore() await blocks.put(rootblk.cid, rootblk.bytes) diff --git a/test/crdt.test.js b/test/crdt.test.js index 22ce0444..0084d23d 100644 --- a/test/crdt.test.js +++ b/test/crdt.test.js @@ -323,10 +323,7 @@ class TestPail { return get(this.blocks, this.head, key) } - /** - * @param {object} [options] - * @param {string} [options.prefix] - */ + /** @param {API.EntriesOptions} [options] */ async * entries (options) { yield * entries(this.blocks, this.head, options) } diff --git a/test/entries.test.js b/test/entries.test.js index 60c446c6..2d3adf86 100644 --- a/test/entries.test.js +++ b/test/entries.test.js @@ -141,4 +141,107 @@ describe('entries', () => { assert.equal(results[i][0], key) } }) + + it('lists entries by key less than string', async () => { + const empty = await ShardBlock.create() + const blocks = new Blockstore() + await blocks.put(empty.cid, empty.bytes) + + /** @type {Array<[string, API.UnknownLink]>} */ + const testdata = [ + ['cccc', await randomCID(32)], + ['deee', await randomCID(32)], + ['dooo', await randomCID(32)], + ['beee', await randomCID(32)] + ] + + /** @type {API.ShardLink} */ + let root = empty.cid + for (const [k, v] of testdata) { + const res = await put(blocks, root, k, v) + for (const b of res.additions) { + await blocks.put(b.cid, b.bytes) + } + root = res.root + } + + const lt = 'doo' + const results = [] + for await (const entry of entries(blocks, root, { lt })) { + results.push(entry) + } + + for (const [i, key] of testdata.map(d => d[0]).filter(k => k < lt).sort().entries()) { + assert.equal(results[i][0], key) + } + }) + + it('lists entries by key less than or equal to string', async () => { + const empty = await ShardBlock.create() + const blocks = new Blockstore() + await blocks.put(empty.cid, empty.bytes) + + /** @type {Array<[string, API.UnknownLink]>} */ + const testdata = [ + ['cccc', await randomCID(32)], + ['deee', await randomCID(32)], + ['dooo', await randomCID(32)], + ['beee', await randomCID(32)] + ] + + /** @type {API.ShardLink} */ + let root = empty.cid + for (const [k, v] of testdata) { + const res = await put(blocks, root, k, v) + for (const b of res.additions) { + await blocks.put(b.cid, b.bytes) + } + root = res.root + } + + const lte = 'dooo' + const results = [] + for await (const entry of entries(blocks, root, { lte })) { + results.push(entry) + } + + for (const [i, key] of testdata.map(d => d[0]).filter(k => k <= lte).sort().entries()) { + assert.equal(results[i][0], key) + } + }) + + it('lists entries by key greater than and less than or equal to string', async () => { + const empty = await ShardBlock.create() + const blocks = new Blockstore() + await blocks.put(empty.cid, empty.bytes) + + /** @type {Array<[string, API.UnknownLink]>} */ + const testdata = [ + ['cccc', await randomCID(32)], + ['deee', await randomCID(32)], + ['dooo', await randomCID(32)], + ['beee', await randomCID(32)] + ] + + /** @type {API.ShardLink} */ + let root = empty.cid + for (const [k, v] of testdata) { + const res = await put(blocks, root, k, v) + for (const b of res.additions) { + await blocks.put(b.cid, b.bytes) + } + root = res.root + } + + const gt = 'c' + const lte = 'deee' + const results = [] + for await (const entry of entries(blocks, root, { gt, lte })) { + results.push(entry) + } + + for (const [i, key] of testdata.map(d => d[0]).filter(k => k > gt && k <= lte).sort().entries()) { + assert.equal(results[i][0], key) + } + }) }) diff --git a/test/helpers.js b/test/helpers.js index f26915bc..ad5c36f6 100644 --- a/test/helpers.js +++ b/test/helpers.js @@ -9,6 +9,7 @@ import archy from 'archy' import * as API from '../src/api.js' import { ShardFetcher, decodeBlock } from '../src/shard.js' import { MemoryBlockstore } from '../src/block.js' +import { entries, get, put } from '../src/index.js' /** * @param {number} min @@ -54,12 +55,11 @@ export async function randomBytes (size) { export class Blockstore extends MemoryBlockstore { /** * @param {import('../src/api.js').ShardLink} cid - * @param {string} [prefix] */ - async getShardBlock (cid, prefix) { + async getShardBlock (cid) { const blk = await this.get(cid) assert(blk) - return decodeBlock(blk.bytes, prefix) + return decodeBlock(blk.bytes) } } @@ -96,3 +96,60 @@ export const vis = async (blocks, root) => { console.log(archy(archyRoot)) } + +/** + * @param {API.BlockFetcher} blocks + * @param {API.ShardLink} root + */ +export const materialize = async (blocks, root) => { + const shards = new ShardFetcher(blocks) + const shard = await shards.get(root) + /** @type {any[]} */ + const entries = [] + for (const e of shard.value.entries) { + if (Array.isArray(e[1])) { + const v = [...e[1]] + // @ts-expect-error + v[0] = await materialize(blocks, e[1][0]) + entries.push([e[0], v]) + } else { + entries.push([...e]) + } + } + return entries +} + +/** + * @param {Blockstore} blocks + * @param {API.ShardLink} root + * @param {Array<[string, API.UnknownLink]>} items + */ +export const putAll = async (blocks, root, items) => { + for (const [k, v] of items) { + const res = await put(blocks, root, k, v) + for (const b of res.additions) { + blocks.putSync(b.cid, b.bytes) + } + for (const b of res.removals) { + blocks.deleteSync(b.cid) + } + root = res.root + } + return { root } +} + +/** + * @param {MemoryBlockstore} blocks + * @param {API.ShardLink} root + * @param {Map} data + */ +export const verify = async (blocks, root, data) => { + for (const [k, v] of data) { + const result = await get(blocks, root, k) + if (!result) throw new Error(`missing item: "${k}": ${v}`) + if (result.toString() !== v.toString()) throw new Error(`incorrect value for ${k}: ${result} !== ${v}`) + } + let total = 0 + for await (const _ of entries(blocks, root)) total++ + if (data.size !== total) throw new Error(`incorrect entry count: ${total} !== ${data.size}`) +} diff --git a/test/put.test.js b/test/put.test.js index 8467d70d..0647ea61 100644 --- a/test/put.test.js +++ b/test/put.test.js @@ -3,39 +3,9 @@ import assert from 'node:assert' import { nanoid } from 'nanoid' // eslint-disable-next-line no-unused-vars import * as API from '../src/api.js' -import { put, get } from '../src/index.js' -import { ShardBlock, MaxKeyLength } from '../src/shard.js' -import * as Shard from '../src/shard.js' -import { Blockstore, randomCID } from './helpers.js' - -const maxShardSize = 1024 // tiny shard size for testing - -/** - * Fill a shard until it exceeds the size limit. Returns the entry that will - * cause the limit to exceed. - * - * @param {API.Shard} shard - * @param {(i: number) => Promise} [mkentry] - */ -async function fillShard (shard, mkentry) { - mkentry = mkentry ?? (async () => [nanoid(), await randomCID(32)]) - let i = 0 - while (true) { - const entry = await mkentry(i) - const blk = await Shard.encodeBlock( - Shard.withEntries( - Shard.putEntry(shard.entries, entry), - shard - ) - ) - if (blk.bytes.length > shard.maxSize) return { shard, entry } - shard = Shard.withEntries( - Shard.putEntry(shard.entries, entry), - shard - ) - i++ - } -} +import { put } from '../src/index.js' +import { ShardBlock } from '../src/shard.js' +import { Blockstore, materialize, putAll, randomCID, randomInteger, randomString, verify } from './helpers.js' describe('put', () => { it('put to empty shard', async () => { @@ -80,80 +50,151 @@ describe('put', () => { assert.equal(result1.root.toString(), result0.root.toString()) }) - it('auto-shards on long key', async () => { + it('auto-shards on similar key', async () => { const root = await ShardBlock.create() const blocks = new Blockstore() await blocks.put(root.cid, root.bytes) const dataCID = await randomCID(32) - const key = Array(MaxKeyLength + 1).fill('a').join('') - const result = await put(blocks, root.cid, key, dataCID) - assert.equal(result.removals.length, 1) - assert.equal(result.removals[0].cid.toString(), root.cid.toString()) - assert.equal(result.additions.length, 2) - assert.equal(result.additions[0].value.entries.length, 1) - assert.equal(result.additions[0].value.entries[0][0], key.slice(-1)) - assert.equal(result.additions[0].value.entries[0][1].toString(), dataCID.toString()) - assert.equal(result.additions[1].value.entries.length, 1) - assert.equal(result.additions[1].value.entries[0][0], key.slice(0, -1)) - assert.equal(result.additions[1].value.entries[0][1][0].toString(), result.additions[0].cid.toString()) + const res = await putAll(blocks, root.cid, [ + ['aaaa', dataCID], + ['aabb', dataCID] + ]) + + assert.deepEqual( + await materialize(blocks, res.root), + [ + [ + 'a', + [ + [ + [ + 'a', + [ + [ + ['aa', dataCID], + ['bb', dataCID] + ] + ] + ] + ] + ] + ] + ] + ) }) - it('auto-shards on super long key', async () => { + it('put to shard link', async () => { const root = await ShardBlock.create() const blocks = new Blockstore() await blocks.put(root.cid, root.bytes) const dataCID = await randomCID(32) - const key = Array(MaxKeyLength * 2 + 1).fill('b').join('') - const result = await put(blocks, root.cid, key, dataCID) - assert.equal(result.removals.length, 1) - assert.equal(result.removals[0].cid.toString(), root.cid.toString()) - assert.equal(result.additions.length, 3) - assert.equal(result.additions[0].value.entries.length, 1) - assert.equal(result.additions[0].value.entries[0][0], key.slice(-1)) - assert.equal(result.additions[0].value.entries[0][1].toString(), dataCID.toString()) - assert.equal(result.additions[1].value.entries.length, 1) - assert.equal(result.additions[1].value.entries[0][0], key.slice(MaxKeyLength, MaxKeyLength * 2)) - assert.equal(result.additions[1].value.entries[0][1][0].toString(), result.additions[0].cid.toString()) - assert.equal(result.additions[2].value.entries.length, 1) - assert.equal(result.additions[2].value.entries[0][0], key.slice(0, MaxKeyLength)) - assert.equal(result.additions[2].value.entries[0][1][0].toString(), result.additions[1].cid.toString()) + const res = await putAll(blocks, root.cid, [ + ['aaaa', dataCID], + ['aabb', dataCID], + ['aa', dataCID] + ]) + + assert.deepEqual( + await materialize(blocks, res.root), + [ + [ + 'a', + [ + [ + [ + 'a', + [ + [ + ['aa', dataCID], + ['bb', dataCID] + ], + dataCID + ] + ] + ] + ] + ] + ] + ) }) - // TODO: deep shard propagates to root - - it('shards at size limit', async () => { - const blocks = new Blockstore() - const pfx = 'test/' - const { shard, entry: [k, v] } = await fillShard(Shard.create({ maxSize: maxShardSize }), async () => { - return [pfx + nanoid(), await randomCID(1)] - }) - const rootblk0 = await Shard.encodeBlock(shard) - await blocks.put(rootblk0.cid, rootblk0.bytes) - - const { root, additions, removals } = await put(blocks, rootblk0.cid, k, v) - - assert.notEqual(root.toString(), rootblk0.cid.toString()) - assert.equal(removals.length, 1) - assert.equal(removals[0].cid.toString(), rootblk0.cid.toString()) - - for (const b of additions) { - await blocks.put(b.cid, b.bytes) + it('deterministic structure', async () => { + const dataCID = await randomCID(32) + /** @type {Array<[string, API.UnknownLink]>} */ + const items = [ + ['aaaa', dataCID], + ['aaab', dataCID], + ['aabb', dataCID], + ['abbb', dataCID], + ['bbbb', dataCID] + ] + const orders = [ + [0, 1, 2, 3, 4], + [4, 3, 2, 1, 0], + [1, 2, 4, 0, 3], + [2, 0, 3, 4, 1], + ] + for (const order of orders) { + const root = await ShardBlock.create() + const blocks = new Blockstore() + await blocks.put(root.cid, root.bytes) + + const res = await putAll(blocks, root.cid, order.map(i => items[i])) + + assert.deepEqual( + await materialize(blocks, res.root), + [ + [ + 'a', + [ + [ + [ + 'a', + [ + [ + [ + 'a', + [ + [ + ['a', dataCID], + ['b', dataCID] + ] + ] + ], + ['bb', dataCID] + ] + ] + ], + ['bbb', dataCID] + ] + ] + ], + ['bbbb', dataCID] + ] + ) } + }) - const rootblk1 = await blocks.getShardBlock(root) + it('put 10,000x', async function () { + this.timeout(1000 * 10) - const entry = rootblk1.value.entries.find(([, v]) => Array.isArray(v)) - assert(entry, 'should find a shard entry') - assert(entry[0].startsWith(pfx)) + const root = await ShardBlock.create() + const blocks = new Blockstore() + await blocks.put(root.cid, root.bytes) - for (const [k, v] of rootblk0.value.entries) { - const value = await get(blocks, rootblk1.cid, k) - assert(value) - assert.equal(value.toString(), v.toString()) + /** @type {Array<[string, API.UnknownLink]>} */ + const items = [] + for (let i = 0; i < 10_000; i++) { + const k = randomString(randomInteger(1, 64)) + const v = await randomCID(randomInteger(8, 128)) + items.push([k, v]) } + + const res = await putAll(blocks, root.cid, items) + await assert.doesNotReject(verify(blocks, res.root, new Map(items))) }) })