Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for DuckDB #25

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
},
"dependencies": {
"arquero": "^3.0.0",
"duckdb": "^0.5.1",
"fast-csv": "^4.3.6",
"pg": "^8.5.1",
"uuid": "^8.3.2"
Expand Down
190 changes: 190 additions & 0 deletions src/databases/duckdb/duckdb-database.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import {v4 as uuid} from 'uuid';
import * as fs from 'fs';
import * as fastcsv from 'fast-csv';
import {DBTable} from '../../db-table';
import {Database} from '../database';
import {DuckDBTableView} from './duckdb-table-view';
import {Database as DuckDB} from 'duckdb';


const defaultDB = new DuckDB(':memory:');

/** @typedef {'TEXT' | 'BOOLEAN' | 'JSON' | 'TIMESTAMPZ' | 'DOUBLE'} DuckDBType */

/**
* @param {any} value
* @returns {DuckDBType | null}
*/
function getPGType(value) {
if (value === null || value === undefined) {
return null;
} else if (typeof value === 'string') {
return 'TEXT';
} else if (typeof value === 'number') {
return 'DOUBLE';
} else if (typeof value === 'boolean') {
return 'BOOLEAN';
} else if (value instanceof Date) {
return 'TIMESTAMPZ';
} else {
return 'JSON';
}
}

/**
* @param {string} name
* @param {string[]} cols
* @returns {string}
*/
function insertInto(name, cols) {
const vals = cols.map(() => '?');
return `INSERT INTO ${name} (${cols.join(',')}) VALUES (${vals.join(',')})`;
}

export class DuckDBDatabase extends Database {
/**
* @param {DuckDB} [db]
*/
constructor(db) {
super();

/** @type {Connection} */
this._connection = (db || defaultDB).connect();
}

/**
* @param {string} name
* @returns {DBTable}
*/
table(name) {
const pbuilder = this.getColumnNames(name).then(
colNames => new DuckDBTableView(name, colNames, null, null, null, this),
);
return new DBTable(pbuilder);
}

/**
* @param {string} path
* @param {{name: string, type: DuckDBType}[]} schema
* @param {string} [name]
* @returns {DBTable}
*/
fromCSV(path, schema, name) {
name = name || `__aq__table__${uuid().split('-').join('')}__`;
const columnNames = schema.map(({name}) => name);
const results = Promise.resolve()
.then(() => {
const stream = fs.createReadStream(path);
const csvData = [];
const csvStream = fastcsv
.parse()
.on('data', csvData.push)
.on('end', () => csvData.shift());
stream.pipe(csvStream);
return csvData;
})
.then(async csvData => {
await this.query(`CREATE TABLE ${name} (${schema.map(({name, type}) => name + ' ' + type).join(',')})`);

const query = insertInto(name, columnNames);
for (const row in csvData) {
await this.query(query, row);
}
});

return getTableAfter(this, results, name);
}

/**
* @param {import('arquero').internal.Table} table
* @param {string} [name]
* @returns {DBTable}
*/
fromArquero(table, name) {
name = name || `__aq__table__${uuid().split('-').join('')}__`;
const columnNames = table.columnNames();
const numRows = table.numRows();
const results = Promise.resolve()
.then(() =>
columnNames.map(cn => {
const column = table.getter(cn);
for (let j = 0; j < numRows; j++) {
const val = column(j);
const type = getPGType(val);
if (type !== null) {
return type;
}
}
return 'TEXT';
}),
)
.then(async types => {
await this.query(`CREATE TABLE ${name} (${columnNames.map((cn, i) => cn + ' ' + types[i]).join(',')})`);

// TODO: use prepare -> run
const insert = insertInto(name, columnNames);
for (const row of table) {
/** @type {string[]} */
const values = [];
for (const i in columnNames) {
const cn = columnNames[i];
const value = row[cn];
values.push(value);

const type = getPGType(value);
if (types[i] !== type && type !== null) {
throw new Error('types in column ' + cn + ' do not match');
}
}
await this.query(insert, values);
}
});

return getTableAfter(this, results, name);
}

/**
* @param {string} table
* @returns {Promise<string[]>}
*/
async getColumnNames(table) {
return this._pool
.query(`PRAGMA table_info('${table}')`)
.then(result => result.map(r => r.column_name));
}

/**
* @param {string} text
* @param {string[]} [values]
* @returns {Promise<object[]>}
*/
async query(text, values) {
values = values || [];
return new Promise((resolve, reject) => {
this._connection.all(text, ...values, function(err, res) {
if (err) {
reject(err);
throw err;
}

resolve(res);
});
});
}

async close() {
await this._connection.close();
}
}

/**
* @param {Database} db
* @param {Promise<any>} promise
* @param {string} name
*/
function getTableAfter(db, promise, name) {
const pbuilder = promise
.then(() => db.getColumnNames(name))
.then(colNames => new DuckDBTableView(name, colNames, null, null, null, db));
return new DBTable(pbuilder);
}
197 changes: 197 additions & 0 deletions src/databases/duckdb/duckdb-table-view.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import {all} from 'arquero';
import {TableView} from '../table-view';
import isFunction from 'arquero/src/util/is-function';
import verbs from './verbs';
import postgresCodeGen from './pg-code-gen';

export class DuckDBTableView extends TableView {
/**
* @param {Source} source source table or another sql query
* @param {string[]} schema object of table schema
* @param {Clauses} [clauses] object of sql clauses
* @param {string[]} [group]
* @param {OrderInfo} [order]
* @param {import('./duckdb-database').DuckDBDatabase} [database]
*/
constructor(source, schema, clauses, group, order, database) {
super();

/** @type {Source} */
this._source = source;

/** @type {string[]} */
this._columns = schema;

if (typeof source !== 'string') {
database = source._database;
}
/** @type {import('./duckdb-database').DuckDBDatabase} */
this._database = database;

/** @type {Clauses} */
this._clauses = clauses || {};

/** @type {string[]} */
this._group = group;

/** @type {OrderInfo} */
this._order = order;
}

/**
*
* @typedef {object} WrapParams
* @prop {string[] | (s: string[]) => string[]} columns
* @prop {Clauses | (c: Clauses) => Clauses} clauses
* @prop {string[] | (s: string[]) => string[]} group
* @prop {OrderInfo[] | (o: OrderInfo[]) => OrderInfo[]} order
*/

/**
*
* @param {WrapParams} param0
*/
_append({columns, clauses, group, order}) {
return new DuckDBTableView(
this._source,
columns !== undefined ? (isFunction(columns) ? columns(this._columns) : columns) : this._columns,
clauses !== undefined ? (isFunction(clauses) ? clauses(this._clauses) : clauses) : this._clauses,
group != undefined ? (isFunction(group) ? group(this._group) : group) : this._group,
order != undefined ? (isFunction(order) ? order(this._order) : order) : this._order,
);
}

/**
*
* @param {WrapParams} param0
*/
_wrap({columns, clauses, group, order}) {
return new DuckDBTableView(
this,
columns !== undefined ? (isFunction(columns) ? columns(this._columns) : columns) : this._columns,
clauses !== undefined ? (isFunction(clauses) ? clauses(this._clauses) : clauses) : {},
group !== undefined ? (isFunction(group) ? group(this._group) : group) : this._group,
order !== undefined ? (isFunction(order) ? order(this._order) : order) : this._order,
);
}

/**
* Indicates if the table has a groupby specification.
* @return {boolean} True if grouped, false otherwise.
*/
isGrouped() {
return !!this._group;
}

/**
* Filter function invoked for each column name.
* @callback NameFilter
* @param {string} name The column name.
* @param {number} index The column index.
* @param {string[]} array The array of names.
* @return {boolean} Returns true to retain the column name.
*/

/**
* The table column names, optionally filtered.
* @param {NameFilter} [filter] An optional filter function.
* If unspecified, all column names are returned.
* @return {string[]} An array of matching column names.
*/
columnNames(filter) {
return filter ? this._columns.filter(filter) : this._columns.slice();
}

/**
* The column name at the given index.
* @param {number} index The column index.
* @return {string} The column name,
* or undefined if the index is out of range.
*/
columnName(index) {
return this._columns[index];
}

// eslint-disable-next-line no-unused-vars
column(name) {
return [];
}

_sql() {
return postgresCodeGen(
this.ungroup()
.select(all())
._append({clauses: c => ({...c, orderby: this._order}), order: null}),
);
}

/**
* @param {import('arquero/src/table/table').ObjectsOptions} [options]
*/
async objects(options = {}) {
const {grouped, limit, offset} = options;

if (grouped) {
throw new Error('TODO: support output grouped table');
}

let t = this;
if (limit !== undefined) {
t = t._append({clauses: c => ({...c, limit: options.limit})});
}
if (offset !== undefined) {
t = t._append({clauses: c => ({...c, offset: offset})});
}

const results = await t._database.query(t._sql());
return results.rows;
}
}

Object.assign(DuckDBTableView.prototype, verbs);

/** @typedef {string | DuckDBTableView} Source _source in DuckDBTableView */

/**
* @typedef {object} AstNode
* @prop {string} type
*/

/**
* @typedef {'INNER' | 'LEFT' | 'RIGHT' | "FULL"} JoinType
*/

/**
* @typedef {object} JoinInfo
* @prop {AstNode} on
* @prop {DuckDBTableView} other
* @prop {JoinType} join_type
*/

/**
* @typedef {object} OrderInfo
* @prop {AstNode[]} exprs
* @prop {boolean[]} descs
*/

/**
* @typedef {object} Clauses _clauses in DuckDBTableView
* @prop {AstNode[]} [select]
* @prop {AstNode[]} [where]
* @prop {AstNode[] | boolean} [groupby]
* @prop {AstNode[]} [having]
* @prop {JoinInfo} [join]
* @prop {OrderInfo} [orderby]
* @prop {number} [limit]
* @prop {number} [offset]
* @prop {Source[]} [concat]
* @prop {Source[]} [union]
* @prop {Source[]} [intersect]
* @prop {Source[]} [except]
*/

/**
* @typedef {object} Schema _schema in DuckDBTableView
* @prop {string[]} columns
* @prop {string[]} [groupby]
*/
Loading