Skip to content

Commit

Permalink
Full in-memory database support (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
DallasHoff authored Nov 23, 2024
1 parent 58de444 commit 2722197
Show file tree
Hide file tree
Showing 18 changed files with 392 additions and 186 deletions.
80 changes: 47 additions & 33 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import type {
Transaction,
DeleteMessage,
DatabasePath,
ExportMessage,
} from './types.js';
import { SQLocalProcessor } from './processor.js';
import { sqlTag } from './lib/sql-tag.js';
Expand All @@ -32,11 +33,12 @@ import { getQueryKey } from './lib/get-query-key.js';
import { normalizeSql } from './lib/normalize-sql.js';
import { parseDatabasePath } from './lib/parse-database-path.js';
import { mutationLock } from './lib/mutation-lock.js';
import { normalizeDatabaseFile } from './lib/normalize-database-file.js';

export class SQLocal {
protected config: ClientConfig;
protected clientKey: QueryKey;
protected processor?: SQLocalProcessor | Worker;
protected processor: SQLocalProcessor | Worker;
protected isDestroyed: boolean = false;
protected bypassMutationLock: boolean = false;
protected userCallbacks = new Map<string, CallbackUserFunction['func']>();
Expand All @@ -48,7 +50,7 @@ export class SQLocal {
]
>();

protected proxy?: WorkerProxy;
protected proxy: WorkerProxy;
protected reinitChannel: BroadcastChannel;

constructor(databasePath: DatabasePath);
Expand All @@ -66,15 +68,22 @@ export class SQLocal {
`_sqlocal_reinit_(${clientConfig.databasePath})`
);

if (typeof globalThis.Worker !== 'undefined') {
if (
typeof globalThis.Worker !== 'undefined' &&
processorConfig.databasePath !== ':memory:'
) {
this.processor = new Worker(new URL('./worker', import.meta.url), {
type: 'module',
});
this.processor.addEventListener('message', this.processMessageEvent);
this.proxy = coincident(this.processor) as WorkerProxy;
} else {
this.processor = new SQLocalProcessor(true);
this.processor.onmessage = (message) => this.processMessageEvent(message);
this.proxy = globalThis as WorkerProxy;
}

this.processor?.postMessage({
this.processor.postMessage({
type: 'config',
config: processorConfig,
} satisfies ConfigMessage);
Expand All @@ -89,8 +98,9 @@ export class SQLocal {
switch (message.type) {
case 'success':
case 'data':
case 'error':
case 'buffer':
case 'info':
case 'error':
if (message.queryKey && queries.has(message.queryKey)) {
const [resolve, reject] = queries.get(message.queryKey)!;
if (message.type === 'error') {
Expand Down Expand Up @@ -124,8 +134,9 @@ export class SQLocal {
| BatchMessage
| TransactionMessage
| FunctionMessage
| ImportMessage
| GetInfoMessage
| ImportMessage
| ExportMessage
| DeleteMessage
| DestroyMessage
>
Expand All @@ -137,12 +148,6 @@ export class SQLocal {
message.type === 'delete',
this.config,
async () => {
if (!this.processor) {
throw new Error(
'This SQLocal client is not connected to a database. This is likely due to the client being initialized in a server-side environment.'
);
}

if (this.isDestroyed === true) {
throw new Error(
'This SQLocal client has been destroyed. You will need to initialize a new client in order to make further queries.'
Expand Down Expand Up @@ -171,6 +176,7 @@ export class SQLocal {
| TransactionMessage
| FunctionMessage
| GetInfoMessage
| ExportMessage
| DeleteMessage
| DestroyMessage);
break;
Expand Down Expand Up @@ -369,7 +375,7 @@ export class SQLocal {
functionType: 'scalar',
});

if (this.proxy && this.proxy !== globalThis) {
if (this.proxy !== globalThis) {
this.proxy[key] = func;
}
};
Expand All @@ -385,19 +391,33 @@ export class SQLocal {
};

getDatabaseFile = async (): Promise<File> => {
const { directories, fileName, getDirectoryHandle } = parseDatabasePath(
this.config.databasePath
);
const tempFileName = `backup-${Date.now()}--${fileName}`;
const tempFilePath = `${directories.join('/')}/${tempFileName}`;

await this.exec('VACUUM INTO ?', [tempFilePath]);
let fileName, fileBuffer;
const { storageType } = await this.getDatabaseInfo();

if (storageType === 'opfs') {
const path = parseDatabasePath(this.config.databasePath);
const { directories, getDirectoryHandle } = path;
fileName = path.fileName;
const tempFileName = `backup-${Date.now()}--${fileName}`;
const tempFilePath = `${directories.join('/')}/${tempFileName}`;

await this.exec('VACUUM INTO ?', [tempFilePath]);

const dirHandle = await getDirectoryHandle();
const fileHandle = await dirHandle.getFileHandle(tempFileName);
const file = await fileHandle.getFile();
fileBuffer = await file.arrayBuffer();
await dirHandle.removeEntry(tempFileName);
} else {
const message = await this.createQuery({ type: 'export' });

const dirHandle = await getDirectoryHandle();
const fileHandle = await dirHandle.getFileHandle(tempFileName);
const file = await fileHandle.getFile();
const fileBuffer = await file.arrayBuffer();
await dirHandle.removeEntry(tempFileName);
if (message.type === 'buffer') {
fileName = 'database.sqlite3';
fileBuffer = message.buffer;
} else {
throw new Error('The database failed to export.');
}
}

return new File([fileBuffer], fileName, {
type: 'application/x-sqlite3',
Expand All @@ -413,16 +433,10 @@ export class SQLocal {
| ReadableStream<Uint8Array>,
beforeUnlock?: () => void | Promise<void>
): Promise<void> => {
let database: ArrayBuffer | Uint8Array | ReadableStream<Uint8Array>;

if (databaseFile instanceof Blob) {
database = databaseFile.stream();
} else {
database = databaseFile;
}

await mutationLock('exclusive', false, this.config, async () => {
try {
const database = await normalizeDatabaseFile(databaseFile);

await this.createQuery({
type: 'import',
database,
Expand Down
74 changes: 74 additions & 0 deletions src/lib/normalize-database-file.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
type DatabaseFileInput =
| File
| Blob
| ArrayBuffer
| Uint8Array
| ReadableStream<Uint8Array>;

export function normalizeDatabaseFile(
dbFile: DatabaseFileInput,
convertStreamTo: 'callback'
): Promise<ArrayBuffer | Uint8Array | (() => Promise<Uint8Array | undefined>)>;
export function normalizeDatabaseFile(
dbFile: DatabaseFileInput,
convertStreamTo: 'buffer'
): Promise<ArrayBuffer | Uint8Array>;
export function normalizeDatabaseFile(
dbFile: DatabaseFileInput,
convertStreamTo?: undefined
): Promise<ArrayBuffer | Uint8Array | ReadableStream<Uint8Array>>;
export async function normalizeDatabaseFile(
dbFile: DatabaseFileInput,
convertStreamTo?: 'callback' | 'buffer'
): Promise<
| ArrayBuffer
| Uint8Array
| ReadableStream<Uint8Array>
| (() => Promise<Uint8Array | undefined>)
> {
let bufferOrStream: ArrayBuffer | Uint8Array | ReadableStream<Uint8Array>;

if (dbFile instanceof Blob) {
bufferOrStream = dbFile.stream();
} else {
bufferOrStream = dbFile;
}

if (bufferOrStream instanceof ReadableStream && convertStreamTo) {
const stream = bufferOrStream;
const reader = stream.getReader();

switch (convertStreamTo) {
case 'callback':
return async () => {
const chunk = await reader.read();
return chunk.value;
};

case 'buffer':
const chunks: Uint8Array[] = [];
let streamDone = false;

while (!streamDone) {
const chunk = await reader.read();
if (chunk.value) chunks.push(chunk.value);
streamDone = chunk.done;
}

const arrayLength = chunks.reduce((length, chunk) => {
return length + chunk.length;
}, 0);
const buffer = new Uint8Array(arrayLength);
let offset = 0;

chunks.forEach((chunk) => {
buffer.set(chunk, offset);
offset += chunk.length;
});

return buffer;
}
} else {
return bufferOrStream;
}
}
Loading

0 comments on commit 2722197

Please sign in to comment.