diff --git a/Cargo.lock b/Cargo.lock index 4cf54131bb..42e092b653 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -966,6 +966,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "console_error_panic_hook" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06aeb73f470f66dcdbf7223caeebb85984942f22f1adb2a088cf9668146bbbc" +dependencies = [ + "cfg-if", + "wasm-bindgen", +] + [[package]] name = "const-random" version = "0.1.18" @@ -5107,6 +5117,21 @@ dependencies = [ "vortex-proto", ] +[[package]] +name = "vortex-wasm" +version = "0.21.0" +dependencies = [ + "bytes", + "console_error_panic_hook", + "futures-channel", + "futures-util", + "js-sys", + "vortex", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "vortex-zigzag" version = "0.21.0" diff --git a/Cargo.toml b/Cargo.toml index f306a62f33..68d85ec3d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ members = [ "vortex-proto", "vortex-sampling-compressor", "vortex-scalar", + "vortex-wasm", "xtask", ] resolver = "2" @@ -81,6 +82,7 @@ flexbuffers = "2.0.0" flume = "0.11" fsst-rs = "0.4.1" futures = { version = "0.3", default-features = false } +futures-channel = "0.3" futures-executor = "0.3" futures-util = "0.3" getrandom = "0.2.14" @@ -91,6 +93,7 @@ humansize = "2.1.3" indicatif = "0.17.8" itertools = "0.13.0" jiff = "0.1.8" +js-sys = "0.3" libfuzzer-sys = "0.4" log = "0.4.21" mimalloc = "0.1.42" @@ -125,7 +128,9 @@ tokio = "1.37.0" tracing = "0.1" url = "2" uuid = "1.8.0" +wasm-bindgen = "0.2" wasm-bindgen-futures = "0.4" +web-sys = { version = "0.3" } # BEGIN crates published by this project vortex = { version = "0.21.0", path = "./vortex" } diff --git a/vortex-io/src/dispatcher/mod.rs b/vortex-io/src/dispatcher/mod.rs index fb518c157e..2f948d08e3 100644 --- a/vortex-io/src/dispatcher/mod.rs +++ b/vortex-io/src/dispatcher/mod.rs @@ -8,7 +8,7 @@ mod wasm; use std::future::Future; use futures::channel::oneshot; -#[cfg(not(any(feature = "compio", feature = "tokio")))] +#[cfg(not(any(feature = "compio", feature = "tokio", target_arch = "wasm32")))] use vortex_error::vortex_panic; use vortex_error::VortexResult; diff --git a/vortex-wasm/.appveyor.yml b/vortex-wasm/.appveyor.yml new file mode 100644 index 0000000000..50910bd6f3 --- /dev/null +++ b/vortex-wasm/.appveyor.yml @@ -0,0 +1,11 @@ +install: + - appveyor-retry appveyor DownloadFile https://win.rustup.rs/ -FileName rustup-init.exe + - if not defined RUSTFLAGS rustup-init.exe -y --default-host x86_64-pc-windows-msvc --default-toolchain nightly + - set PATH=%PATH%;C:\Users\appveyor\.cargo\bin + - rustc -V + - cargo -V + +build: false + +test_script: + - cargo test --locked diff --git a/vortex-wasm/.gitignore b/vortex-wasm/.gitignore new file mode 100644 index 0000000000..82a9f352f1 --- /dev/null +++ b/vortex-wasm/.gitignore @@ -0,0 +1,11 @@ +/target +/.idea +**/*.rs.bk +Cargo.lock +bin/ +pkg/ +wasm-pack.log +/vortex.js +/vortex.d.ts +tsconfig.tsbuildinfo +/node_modules diff --git a/vortex-wasm/Cargo.toml b/vortex-wasm/Cargo.toml new file mode 100644 index 0000000000..b25e752029 --- /dev/null +++ b/vortex-wasm/Cargo.toml @@ -0,0 +1,53 @@ +[package] +name = "vortex-wasm" +description = "Vortex WebAssembly bindings" +publish = false +version = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +authors = { workspace = true } +license = { workspace = true } +keywords = { workspace = true } +include = { workspace = true } +edition = { workspace = true } +rust-version = { workspace = true } +categories = { workspace = true } +readme = { workspace = true } + +[lib] +crate-type = ["cdylib", "rlib"] + +[features] +default = ["console_error_panic_hook"] + +[dependencies] + +# The `console_error_panic_hook` crate provides better debugging of panics by +# logging them with `console.error`. This is great for development, but requires +# all the `std::fmt` and `std::panicking` infrastructure, so isn't great for +# code size when deploying. +console_error_panic_hook = { version = "0.1.7", optional = true } +bytes = { workspace = true } +vortex = { workspace = true } +js-sys = { workspace = true } +futures-channel = { workspace = true } +futures-util = { workspace = true } +wasm-bindgen = { workspace = true } +wasm-bindgen-futures = { workspace = true } +web-sys = { workspace = true, features = [ + "console", + "Blob", + "File", + "FileReader", + 'ReadableStream', + "ReadableStreamByobReader", + "ReadableStreamByobRequest", + "ReadableStreamDefaultReader", + "ReadableStreamReadResult", + "ReadableStreamReaderMode", + "ReadableStreamGetReaderOptions", +] } + +[profile.release] +# Tell `rustc` to optimize for small code size. +opt-level = "s" diff --git a/vortex-wasm/index.html b/vortex-wasm/index.html new file mode 100644 index 0000000000..fae08cc406 --- /dev/null +++ b/vortex-wasm/index.html @@ -0,0 +1,40 @@ + + + + + + +

Upload a Vortex file

+ + + + + diff --git a/vortex-wasm/package-lock.json b/vortex-wasm/package-lock.json new file mode 100644 index 0000000000..92f1f5cb6d --- /dev/null +++ b/vortex-wasm/package-lock.json @@ -0,0 +1,30 @@ +{ + "name": "@vortex-dev/vortex", + "version": "0.2.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "@vortex-dev/vortex", + "version": "0.2.0", + "license": "ISC", + "devDependencies": { + "typescript": "~5.6.2" + } + }, + "node_modules/typescript": { + "version": "5.6.3", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.6.3.tgz", + "integrity": "sha512-hjcS1mhfuyi4WW8IWtjP7brDrG2cuDZukyrYrSauoXGNgx0S7zceP07adYkJycEr56BOUTNPzbInooiN3fn1qw==", + "dev": true, + "license": "Apache-2.0", + "bin": { + "tsc": "bin/tsc", + "tsserver": "bin/tsserver" + }, + "engines": { + "node": ">=14.17" + } + } + } +} diff --git a/vortex-wasm/package.json b/vortex-wasm/package.json new file mode 100644 index 0000000000..c89dd0b891 --- /dev/null +++ b/vortex-wasm/package.json @@ -0,0 +1,27 @@ +{ + "name": "@vortex-dev/vortex", + "version": "0.2.0", + "description": "Vortex WASM bindings (WIP)", + "type": "module", + "module": "./vortex.js", + "types": "./vortex.d.ts", + "directories": { + "test": "tests" + }, + "scripts": { + "build": "wasm-pack build --target web && tsc" + }, + "author": "", + "license": "ISC", + "files": [ + "vortex.ts", + "vortex.js", + "vortex.d.ts", + "pkg/vortex_wasm.js", + "pkg/vortex_wasm_bg.wasm", + "pkg/vortex_wasm_bg.wasm.d.ts" + ], + "devDependencies": { + "typescript": "~5.6.2" + } +} diff --git a/vortex-wasm/src/blob.rs b/vortex-wasm/src/blob.rs new file mode 100644 index 0000000000..71345ca93e --- /dev/null +++ b/vortex-wasm/src/blob.rs @@ -0,0 +1,69 @@ +use std::cell::RefCell; +use std::future::Future; +use std::rc::Rc; + +use bytes::{Bytes, BytesMut}; +use futures_channel::oneshot; +use futures_util::FutureExt; +use js_sys::Uint8Array; +use vortex::io::VortexReadAt; +use wasm_bindgen::closure::Closure; +use wasm_bindgen::JsCast; +use web_sys::{Blob, FileReader}; + +#[derive(Clone)] +pub struct BlobReader(pub Rc>); + +// (•_•) it's time to get +// ( •_•)>⌐■-■ ... +// (⌐■_■) Send + Sync +// +// This is safe because the browser runs single-threaded. +// TODO(aduffy): is this actually safe? +unsafe impl Send for BlobReader {} +unsafe impl Sync for BlobReader {} + +impl VortexReadAt for BlobReader { + fn read_byte_range( + &self, + pos: u64, + len: u64, + ) -> impl Future> + 'static { + let this = self.clone(); + web_sys::console::log_1(&format!("read_byte_range({pos}, {len})").into()); + + let (tx, rx) = oneshot::channel(); + + let start: i32 = pos.try_into().unwrap(); + let end: i32 = (pos + len).try_into().unwrap(); + let sliced = this.0.borrow().slice_with_i32_and_i32(start, end).unwrap(); + + let file_reader = FileReader::new().unwrap(); + let file_reader_cb = file_reader.clone(); + + // Send the onload handler + let loadend = Closure::once_into_js(move || { + let array_buf = file_reader_cb.result().unwrap(); + let array = Uint8Array::new(array_buf.as_ref()); + let mut result = BytesMut::with_capacity(len.try_into().unwrap()); + unsafe { + result.set_len(result.capacity()); + } + array.copy_to(&mut result); + + // Send the result to the main thread. + tx.send(result).unwrap(); + }); + file_reader.set_onloadend(loadend.dyn_ref()); + + // Trigger the streaming read. + file_reader.read_as_array_buffer(&sliced).unwrap(); + + // Return the reader which will be awaited. + rx.map(|res| Ok(res.unwrap().freeze())) + } + + fn size(&self) -> impl Future> + 'static { + std::future::ready(Ok(self.0.borrow().size() as u64)) + } +} diff --git a/vortex-wasm/src/lib.rs b/vortex-wasm/src/lib.rs new file mode 100644 index 0000000000..2e3487f6b5 --- /dev/null +++ b/vortex-wasm/src/lib.rs @@ -0,0 +1,275 @@ +mod blob; +mod utils; + +use std::cell::RefCell; +use std::convert::Into; +use std::rc::Rc; + +use futures_util::StreamExt; +use vortex::array::ChunkedArray; +use vortex::compute::{scalar_at, slice}; +use vortex::dtype::{DType, PType}; +use vortex::file::{LayoutContext, LayoutDeserializer, VortexReadBuilder}; +use vortex::sampling_compressor::ALL_ENCODINGS_CONTEXT; +use vortex::scalar::Scalar; +use vortex::{ArrayData, IntoArrayData}; +use wasm_bindgen::prelude::*; +use web_sys::js_sys::{Map, Object, Uint8Array}; +use web_sys::Blob; + +use crate::blob::BlobReader; +use crate::utils::set_panic_hook; + +#[wasm_bindgen(js_name = File)] +pub struct VortexFile { + reader: BlobReader, +} + +#[wasm_bindgen(start)] +fn start() { + web_sys::console::log_1(&"vortex-wasm starting".into()); + web_sys::console::log_1(&"setting panic hook...".into()); + set_panic_hook(); +} + +#[wasm_bindgen(js_class = File)] +impl VortexFile { + #[wasm_bindgen(js_name = fromBlob)] + pub async fn from_blob(blob: Blob) -> Self { + Self { + reader: BlobReader(Rc::new(RefCell::new(blob))), + } + } + + /// Log the DType to the console. + #[wasm_bindgen(js_name = printSchema)] + pub async fn print_schema(&self) { + // let buffer = self.buffer.clone(); + let inner = self.reader.clone(); + let reader = VortexReadBuilder::new( + inner, + LayoutDeserializer::new( + ALL_ENCODINGS_CONTEXT.clone(), + LayoutContext::default().into(), + ), + ) + .build() + .await + .expect("building reader"); + + web_sys::console::log_1(&format!("dtype = {}", reader.dtype()).into()); + web_sys::console::log_1(&format!("row_count = {}", reader.row_count()).into()); + } + + /// Materialize the entire array. + #[wasm_bindgen] + pub async fn collect(&self) -> ArrayBatch { + let mut reader = VortexReadBuilder::new( + self.reader.clone(), + // self.buffer.clone(), + LayoutDeserializer::new( + ALL_ENCODINGS_CONTEXT.clone(), + LayoutContext::default().into(), + ), + ) + .build() + .await + .expect("building reader"); + + let dtype = reader.dtype().clone(); + let mut chunks = Vec::new(); + while let Some(next) = reader.next().await { + let next = next.unwrap(); + web_sys::console::log_1(&format!("loaded another chunk of len {}", next.len()).into()); + chunks.push(next); + } + + let chunked = ChunkedArray::try_new(chunks, dtype).unwrap().into_array(); + + ArrayBatch { inner: chunked } + } +} + +#[wasm_bindgen] +pub struct ArrayBatch { + inner: ArrayData, +} + +#[wasm_bindgen] +impl ArrayBatch { + /// Get the number of elements in this array. + #[wasm_bindgen] + pub fn length(&self) -> u32 { + self.inner.len() as u32 + } + + /// Get the n-th value of an array. + #[wasm_bindgen] + pub fn scalar_at(&self, index: u32) -> JsValue { + let scalar = scalar_at(&self.inner, index as usize).unwrap(); + to_js_val(scalar) + } + + /// Slice the array to an element range. + #[wasm_bindgen] + pub fn slice(&self, start: u32, end: u32) -> Self { + Self { + inner: slice(&self.inner, start as usize, end as usize).unwrap(), + } + } + + /// Return the column names if array is of Struct-type. + /// + /// Returns `undefined` for all other types. + #[wasm_bindgen] + pub fn columns(&self) -> JsValue { + let Some(struct_array) = self.inner.as_struct_array() else { + return JsValue::undefined(); + }; + + // Get a column description for each name. + let names = js_sys::Array::new(); + for name in struct_array.names().iter() { + names.push(&JsValue::from_str(name.as_ref())); + } + + names.into() + } + + /// Get the WASM bindgen types. + #[wasm_bindgen] + pub fn types(&self) -> JsValue { + let Some(struct_array) = self.inner.as_struct_array() else { + return JsValue::undefined(); + }; + + let dtypes = js_sys::Array::new(); + for dtype in struct_array.dtypes() { + dtypes.push(&JsValue::from_str(dtype.to_string().as_str())); + } + + dtypes.into() + } + + // Get the column from an array. + #[wasm_bindgen] + pub fn column(&self, name: &str) -> Self { + let array = self + .inner + .as_struct_array() + .expect("StructArray") + .field_by_name(name) + .expect("field not found on struct"); + + Self { inner: array } + } + + // Materialize the all the data as a JS Array. + // + // Note: This is very slow and should be avoided. + pub fn to_js(&self) -> JsValue { + let js_array = js_sys::Array::new(); + + for i in 0..self.length() { + js_array.push(&self.scalar_at(i)); + } + + js_array.into() + } +} + +fn to_js_val(scalar: Scalar) -> JsValue { + match scalar.dtype() { + DType::Null => JsValue::null(), + DType::Bool(_) => scalar + .as_bool() + .value() + .map(JsValue::from_bool) + .unwrap_or_else(JsValue::null), + DType::Primitive(ptype, _) => { + // The scalar needs to be up-cast to f64 because that is all + // JavaScript can represent. + let maybe_f64_scalar = match ptype { + PType::U8 => scalar.as_primitive().typed_value::().map(JsValue::from), + PType::U16 => scalar + .as_primitive() + .typed_value::() + .map(JsValue::from), + PType::U32 => scalar + .as_primitive() + .typed_value::() + .map(JsValue::from), + PType::U64 => scalar + .as_primitive() + .typed_value::() + .map(JsValue::from), + PType::I8 => scalar.as_primitive().typed_value::().map(JsValue::from), + PType::I16 => scalar + .as_primitive() + .typed_value::() + .map(JsValue::from), + PType::I32 => scalar + .as_primitive() + .typed_value::() + .map(JsValue::from), + PType::I64 => scalar + .as_primitive() + .typed_value::() + .map(JsValue::from), + PType::F16 => { + panic!("invalid type"); + } + PType::F32 => scalar + .as_primitive() + .typed_value::() + .map(JsValue::from), + PType::F64 => scalar + .as_primitive() + .typed_value::() + .map(JsValue::from), + }; + + // fallback to null + maybe_f64_scalar.unwrap_or_else(JsValue::null) + } + DType::Utf8(_) => scalar + .as_utf8() + .value() + .map(|string| JsValue::from_str(string.as_str())) + .unwrap_or_else(JsValue::null), + DType::Binary(_) => { + scalar + .as_binary() + .value() + .map(|binary| { + // Copy the data into the Uint8Array. + let buffer = Uint8Array::new_with_length(binary.len() as u32); + buffer.copy_from(binary.as_slice()); + JsValue::from(buffer) + }) + .unwrap_or_else(JsValue::null) + } + DType::Struct(..) => { + // recursively generate the struct + let struct_scalar = scalar.as_struct(); + let field_names = struct_scalar.dtype().as_struct().unwrap().names().clone(); + let Some(fields) = struct_scalar.fields() else { + return JsValue::null(); + }; + + // Create a new JS Object to hold all the fields. + let properties = Map::new(); + for (field_name, scalar) in field_names.iter().zip(fields.into_iter()) { + properties.set(&field_name.to_string().into(), &to_js_val(scalar)); + } + + // Freeze the object + let js_obj = Object::from_entries(properties.as_ref()).unwrap(); + Object::freeze(&js_obj).into() + } + DType::List(..) => { + panic!("lol"); + } + DType::Extension(_) => JsValue::from_str("fix handling of ExtensionDType"), + } +} diff --git a/vortex-wasm/src/utils.rs b/vortex-wasm/src/utils.rs new file mode 100644 index 0000000000..b1d7929dc9 --- /dev/null +++ b/vortex-wasm/src/utils.rs @@ -0,0 +1,10 @@ +pub fn set_panic_hook() { + // When the `console_error_panic_hook` feature is enabled, we can call the + // `set_panic_hook` function at least once during initialization, and then + // we will get better error messages if our code ever panics. + // + // For more details see + // https://github.com/rustwasm/console_error_panic_hook#readme + #[cfg(feature = "console_error_panic_hook")] + console_error_panic_hook::set_once(); +} diff --git a/vortex-wasm/tsconfig.json b/vortex-wasm/tsconfig.json new file mode 100644 index 0000000000..9e91faf076 --- /dev/null +++ b/vortex-wasm/tsconfig.json @@ -0,0 +1,23 @@ +{ + "compilerOptions": { + "composite": true, + "skipLibCheck": true, + "module": "ESNext", + "target": "ESNext", + "moduleResolution": "bundler", + "lib": [ + "ES2020", + "DOM", + "DOM.Iterable" + ], + /* Linting */ + "strict": true, + "noUnusedLocals": true, + "noUnusedParameters": true, + "noFallthroughCasesInSwitch": true, + "noUncheckedSideEffectImports": true + }, + "include": [ + "vortex.ts" + ] +} diff --git a/vortex-wasm/vortex.ts b/vortex-wasm/vortex.ts new file mode 100644 index 0000000000..47cfead8bc --- /dev/null +++ b/vortex-wasm/vortex.ts @@ -0,0 +1,26 @@ +import init, { File } from './pkg/vortex_wasm'; + +let loaded = false; + +async function loadInner(module_or_path?: any) { + await init(module_or_path); + + loaded = true; +} + +/** + * Initialize the Vortex WASM module. + * + * Once the promise resolves, Vortex objects are usable. + * + * @param module_or_path Optional location to load WASM module (default: load from dist). + */ +export async function vortexLoad(module_or_path?: any) { + if (!loaded) { + await loadInner(module_or_path); + } +} + +export default { + File, +};