From a92b35d23aa3a512d7dd4eb1eb25fb8216f26d6f Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Thu, 15 Sep 2022 22:06:42 -0700 Subject: [PATCH] [FIX] install `@rapidsai/sql` module's `.jar` files into runtime images (#431) * install sql .jar files into runtime images * default to TCP until we can figure out UCX issues * update types * make cluster workers retain their in-flight tables until they're received by the main proc, then dispose immediately after to avoid OOM's * add a nonce to help ensure worker in-flight message names don't conflict * add logging to the sql demo, fix a UI bug --- dev/dockerfiles/runtime/demo.Dockerfile | 9 ++- dev/dockerfiles/runtime/main.Dockerfile | 8 ++- dev/dockerfiles/runtime/sql.Dockerfile | 8 ++- .../components/querydashboard.jsx | 24 ++++--- modules/demo/sql/sql-cluster-server/index.js | 72 +++++++++++-------- modules/sql/blazingsql/graph.cpp | 6 +- modules/sql/src/cluster.ts | 6 +- modules/sql/src/cluster/remote.ts | 11 ++- modules/sql/src/cluster/worker.ts | 16 ++++- modules/sql/src/config.ts | 4 +- modules/sql/src/context.ts | 2 +- modules/sql/src/graph.ts | 19 ++++- modules/sql/src/rapidsai_sql.ts | 5 +- 13 files changed, 130 insertions(+), 60 deletions(-) diff --git a/dev/dockerfiles/runtime/demo.Dockerfile b/dev/dockerfiles/runtime/demo.Dockerfile index 652efdc75..1cf4edaa0 100644 --- a/dev/dockerfiles/runtime/demo.Dockerfile +++ b/dev/dockerfiles/runtime/demo.Dockerfile @@ -34,7 +34,12 @@ RUN --mount=type=bind,from=build,source=/opt/rapids/,target=/tmp/rapids/ \ -f /tmp/rapids/rapidsai_${x}-*-Linux.tar.gz \ --wildcards --strip-components=2 \ -x "**/lib/rapidsai_${x}.node" ; \ - done + done; \ + tar -C node_modules/@rapidsai/sql/build/Release \ + -f /tmp/rapids/rapidsai_sql-*.tar.gz \ + --wildcards --strip-components=2 \ + -x "*/blazingsql-*.jar" ; + FROM scratch as ucx-deb-amd64 @@ -66,7 +71,7 @@ RUN --mount=type=bind,from=ucx-deb,target=/usr/src/ucx \ # GLEW dependencies libglvnd0 libgl1 libglx0 libegl1 libgles2 libglu1-mesa \ # UCX runtime dependencies - libibverbs1 librdmacm1 libnuma1 \ + libibverbs1 librdmacm1 libnuma1 numactl \ # node-canvas dependencies libcairo2 libpango-1.0-0 libpangocairo-1.0-0 libjpeg8 libgif7 librsvg2-2 \ # SQL dependencies diff --git a/dev/dockerfiles/runtime/main.Dockerfile b/dev/dockerfiles/runtime/main.Dockerfile index 64fe3d337..cd4866dbc 100644 --- a/dev/dockerfiles/runtime/main.Dockerfile +++ b/dev/dockerfiles/runtime/main.Dockerfile @@ -33,7 +33,11 @@ RUN --mount=type=bind,from=build,source=/opt/rapids/,target=/tmp/rapids/ \ -f /tmp/rapids/rapidsai_${x}-*-Linux.tar.gz \ --wildcards --strip-components=2 \ -x "**/lib/rapidsai_${x}.node" ; \ - done + done; \ + tar -C node_modules/@rapidsai/sql/build/Release \ + -f /tmp/rapids/rapidsai_sql-*.tar.gz \ + --wildcards --strip-components=2 \ + -x "*/blazingsql-*.jar" ; FROM scratch as ucx-deb-amd64 @@ -65,7 +69,7 @@ RUN --mount=type=bind,from=ucx-deb,target=/usr/src/ucx \ # GLEW dependencies libglvnd0 libgl1 libglx0 libegl1 libgles2 libglu1-mesa \ # UCX runtime dependencies - libibverbs1 librdmacm1 libnuma1 \ + libibverbs1 librdmacm1 libnuma1 numactl \ # node-canvas dependencies libcairo2 libpango-1.0-0 libpangocairo-1.0-0 libjpeg8 libgif7 librsvg2-2 \ # SQL dependencies diff --git a/dev/dockerfiles/runtime/sql.Dockerfile b/dev/dockerfiles/runtime/sql.Dockerfile index 6b36f4e1d..624e643b7 100644 --- a/dev/dockerfiles/runtime/sql.Dockerfile +++ b/dev/dockerfiles/runtime/sql.Dockerfile @@ -24,7 +24,11 @@ RUN --mount=type=bind,from=build,source=/opt/rapids/,target=/tmp/rapids/ \ -f /tmp/rapids/rapidsai_${x}-*-Linux.tar.gz \ --wildcards --strip-components=2 \ -x "**/lib/rapidsai_${x}.node" ; \ - done + done; \ + tar -C node_modules/@rapidsai/sql/build/Release \ + -f /tmp/rapids/rapidsai_sql-*.tar.gz \ + --wildcards --strip-components=2 \ + -x "*/blazingsql-*.jar" ; FROM scratch as ucx-deb-amd64 @@ -46,7 +50,7 @@ RUN --mount=type=bind,from=ucx-deb,target=/usr/src/ucx \ && apt update \ && apt install -y --no-install-recommends \ # UCX runtime dependencies - libibverbs1 librdmacm1 libnuma1 \ + libibverbs1 librdmacm1 libnuma1 numactl \ # SQL dependencies openjdk-8-jre-headless libboost-regex-dev libboost-system-dev libboost-filesystem-dev \ # Install UCX diff --git a/modules/demo/sql/sql-cluster-server/components/querydashboard.jsx b/modules/demo/sql/sql-cluster-server/components/querydashboard.jsx index 61ebfa5f6..0ed0a8a08 100644 --- a/modules/demo/sql/sql-cluster-server/components/querydashboard.jsx +++ b/modules/demo/sql/sql-cluster-server/components/querydashboard.jsx @@ -37,6 +37,10 @@ const columns = [ { id: 'text', label: 'Text', minWidth: 500 } ]; +/** + * + * @param {import('apache-arrow').Table} table + */ function formatData(table) { let rows = []; if (table.length == 0) { @@ -44,19 +48,19 @@ function formatData(table) { } const resultsToDisplay = table.length < MAX_RESULTS_TO_DISPLAY ? table.length : MAX_RESULTS_TO_DISPLAY; - const ids = [...table.getColumn("id")].map((x) => +x).slice(0, resultsToDisplay); - const revids = [...table.getColumn("revid")].map((x) => +x).slice(0, resultsToDisplay); - const urls = [...table.getColumn("url")].slice(0, resultsToDisplay); - const titles = [...table.getColumn("title")].slice(0, resultsToDisplay); - const texts = [...table.getColumn("text")].slice(0, resultsToDisplay); + const ids = [...table.getChild("id") || []].map(Number).slice(0, resultsToDisplay); + const revids = [...table.getChild("revid") || []].map(Number).slice(0, resultsToDisplay); + const urls = [...table.getChild("url") || []].slice(0, resultsToDisplay); + const titles = [...table.getChild("title") || []].slice(0, resultsToDisplay); + const texts = [...table.getChild("text") || []].slice(0, resultsToDisplay); for (let i = 0; i < resultsToDisplay; ++i) { rows.push({ - id: ids[i], - revid: revids[i], - url: urls[i], - title: titles[i], - text: texts[i] + id: ids[i] || '', + revid: revids[i] || '', + url: urls[i] || '', + title: titles[i] || '', + text: texts[i] || '', }); } diff --git a/modules/demo/sql/sql-cluster-server/index.js b/modules/demo/sql/sql-cluster-server/index.js index 451b2335f..0f29674b7 100755 --- a/modules/demo/sql/sql-cluster-server/index.js +++ b/modules/demo/sql/sql-cluster-server/index.js @@ -14,11 +14,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +const fs = require('fs'); const {performance} = require('perf_hooks'); -const {SQLCluster} = require('@rapidsai/sql'); -const {DataFrame} = require('@rapidsai/cudf'); const {RecordBatchStreamWriter} = require('apache-arrow'); -const fs = require('fs'); +const {SQLCluster} = require('@rapidsai/sql'); +const {DataFrame, scope} = require('@rapidsai/cudf'); const fastify = require('fastify')({ pluginTimeout: 30000, @@ -48,7 +48,24 @@ fastify.register((require('fastify-arrow'))) dev: process.env.NODE_ENV !== 'production', })) .register(async (instance, opts, done) => { - sqlCluster = await SQLCluster.init({numWorkers: 10}); + const logPath = `${__dirname}/.logs`; + require('rimraf').sync(logPath); + fs.mkdirSync(logPath, {recursive: true}); + + sqlCluster = await SQLCluster.init({ + numWorkers: Infinity, + enableLogging: true, + // allocationMode: 'pool_memory_resource', + configOptions: { + PROTOCOL: 'TCP', + ENABLE_TASK_LOGS: true, + ENABLE_COMMS_LOGS: true, + ENABLE_OTHER_ENGINE_LOGS: true, + ENABLE_GENERAL_ENGINE_LOGS: true, + BLAZING_LOGGING_DIRECTORY: logPath, + BLAZING_LOCAL_LOGGING_DIRECTORY: logPath, + } + }); await sqlCluster.createCSVTable('test_table', DATA_PATHS); done(); }) @@ -57,23 +74,21 @@ fastify.register((require('fastify-arrow'))) fastify.post('/run_query', async function(request, reply) { try { request.log.info({query: request.body}, `calling sqlCluster.sql()`); - const t0 = performance.now(); - const dfs = await sqlCluster.sql(request.body).catch((err) => { - request.log.error({err}, `Error calling sqlCluster.sql`); - return new DataFrame(); - }); - const t1 = performance.now(); - const queryTime = t1 - t0; - - const {results, resultCount} = head(dfs, 500); - const arrowTable = results.toArrow(); - arrowTable.schema.metadata.set('queryTime', queryTime); - arrowTable.schema.metadata.set('queryResults', resultCount); - RecordBatchStreamWriter.writeAll(arrowTable).pipe(reply.stream()); + await scope(async () => { + const t0 = performance.now(); + const dfs = await sqlCluster.sql(request.body).catch((err) => { + request.log.error({err}, `Error calling sqlCluster.sql`); + return new DataFrame(); + }); + const t1 = performance.now(); + const queryTime = t1 - t0; - // TODO: remove these calls to dispose once scope() supports async - results.dispose(); - dfs.forEach((df) => df.dispose()); + const {result, rowCount} = head(dfs, 500); + const arrowTable = result.toArrow(); + arrowTable.schema.metadata.set('queryTime', queryTime); + arrowTable.schema.metadata.set('queryResults', rowCount); + RecordBatchStreamWriter.writeAll(arrowTable).pipe(reply.stream()); + }); } catch (err) { request.log.error({err}, '/run_query error'); reply.code(500).send(err); @@ -91,16 +106,15 @@ function head(dfs, rows) { let rowCount = 0; for (let i = 0; i < dfs.length; ++i) { - if (dfs[i].numRows == 0) continue; - rowCount += dfs[i].numRows; - if (result.numRows <= rows) { - const head = dfs[i].head(rows - result.numRows); - result = result.concat(head); - - // TODO: remove this call to dispose once scope() supports async - head.dispose(); + if (dfs[i].numRows > 0) { + rowCount += dfs[i].numRows; + if (result.numRows <= rows) { + result = scope(() => { // + return result.concat(dfs[i].head(rows - result.numRows)); + }); + } } } - return {results: result, resultCount: rowCount}; + return {result, rowCount}; } diff --git a/modules/sql/blazingsql/graph.cpp b/modules/sql/blazingsql/graph.cpp index ef92a2420..2866efee2 100644 --- a/modules/sql/blazingsql/graph.cpp +++ b/modules/sql/blazingsql/graph.cpp @@ -85,6 +85,7 @@ Napi::Value ExecutionGraph::send(Napi::CallbackInfo const& info) { int32_t dst_ral_id = args[0]; Napi::Array data_frames = args[1]; + int32_t nonce = args[2]; auto messages = Napi::Array::New(env, data_frames.Length()); for (int i = 0; i < data_frames.Length(); ++i) { @@ -97,8 +98,9 @@ Napi::Value ExecutionGraph::send(Napi::CallbackInfo const& info) { auto ctx_token = std::to_string(_graph->get_last_kernel()->input_cache()->get_context()->getContextToken()); - std::string message = "broadcast_table_message_" + std::to_string(i); - messages[i] = message; + std::string message = + "broadcast_table_message_" + std::to_string(nonce) + "_" + std::to_string(i); + messages[i] = message; _context.Value()->send(dst_ral_id, ctx_token, message, names, *table); } diff --git a/modules/sql/src/cluster.ts b/modules/sql/src/cluster.ts index 7ea6a2f73..ef3419fd2 100644 --- a/modules/sql/src/cluster.ts +++ b/modules/sql/src/cluster.ts @@ -1,4 +1,4 @@ -// Copyright (c) 2021, NVIDIA CORPORATION. +// Copyright (c) 2021-2022, NVIDIA CORPORATION. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -33,7 +33,7 @@ export interface Worker { createContext(props: Omit): Promise; } -export interface ClusterProps { +export interface ClusterProps extends ContextProps { ip: string; port: number; numWorkers: number; @@ -54,7 +54,7 @@ export class SQLCluster { * const cluster = await Cluster.init(); * ``` */ - public static async init(options: Partial&Partial = {}) { + public static async init(options: Partial = {}) { const {numWorkers = Device.numDevices, ip = '0.0.0.0', port = 4000} = options; const { networkIfaceName = 'lo', diff --git a/modules/sql/src/cluster/remote.ts b/modules/sql/src/cluster/remote.ts index 29234957e..acac5bc1f 100644 --- a/modules/sql/src/cluster/remote.ts +++ b/modules/sql/src/cluster/remote.ts @@ -86,8 +86,15 @@ export class RemoteSQLWorker implements Worker { public sql(query: string, token: number) { return this._send({type: 'sql', query, token, destinationId: this._cluster.context.id}) - .then(({messageIds}: {messageIds: string[]}) => - Promise.all(messageIds.map((id: string) => this._cluster.context.pull(id)))); + .then(({messageIds}: {messageIds: string[]}) => { + return Promise.all(messageIds.map((messageId: string) => { + return this._cluster.context.pull(messageId).then((df) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this._send({type: 'release', messageId}); + return df; + }); + })); + }); } private _send({type, ...rest}: any = {}) { diff --git a/modules/sql/src/cluster/worker.ts b/modules/sql/src/cluster/worker.ts index e3e25af2d..10cc9c891 100644 --- a/modules/sql/src/cluster/worker.ts +++ b/modules/sql/src/cluster/worker.ts @@ -12,10 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +import {DataFrame} from '@rapidsai/cudf'; + import {ContextProps, UcpContext} from '../addon'; import {SQLContext} from '../context'; let context: SQLContext; +const allInFlightTables: Record = {}; function die({code = 0}: any) { process.exit(code); } @@ -45,7 +48,17 @@ function createORCTable({name, paths}: {name: string, paths: string[]}) { async function sql({query, token, destinationId}: {uuid: string, query: string, token: number; destinationId: number}) { - return {messageIds: await context.sql(query, token).sendTo(destinationId)}; + const newInFlightTables = await context.sql(query, token).sendTo(destinationId); + Object.assign(allInFlightTables, newInFlightTables); + return {messageIds: Object.keys(newInFlightTables)}; +} + +function release({messageId}: {messageId: string}) { + const df = allInFlightTables[messageId]; + if (df) { + delete allInFlightTables[messageId]; + df.dispose(); + } } process.on('message', ({type, ...opts}: any) => { @@ -55,6 +68,7 @@ process.on('message', ({type, ...opts}: any) => { case 'kill': return die(opts); case 'init': return init(opts); case 'sql': return await sql(opts); + case 'release': return release(opts); case 'dropTable': return dropTable(opts); case 'createDataFrameTable': return await createDataFrameTable(opts); case 'createCSVTable': return createCSVTable(opts); diff --git a/modules/sql/src/config.ts b/modules/sql/src/config.ts index b82c65c62..d266e43ca 100644 --- a/modules/sql/src/config.ts +++ b/modules/sql/src/config.ts @@ -1,4 +1,4 @@ -// Copyright (c) 2021, NVIDIA CORPORATION. +// Copyright (c) 2021-2022, NVIDIA CORPORATION. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -48,5 +48,5 @@ export const defaultContextConfigValues = { export const defaultClusterConfigValues = { ...defaultContextConfigValues, - PROTOCOL: 'UCX' + PROTOCOL: 'TCP' }; diff --git a/modules/sql/src/context.ts b/modules/sql/src/context.ts index 0d4794afa..44694db1e 100644 --- a/modules/sql/src/context.ts +++ b/modules/sql/src/context.ts @@ -40,7 +40,7 @@ export class SQLContext { declare private _schema: any; declare private _generator: any; declare private _tables: Map; - declare private _configOptions: Record; + declare private _configOptions: typeof defaultContextConfigValues; constructor(options: Partial = {}) { this._db = CatalogDatabaseImpl('main'); diff --git a/modules/sql/src/graph.ts b/modules/sql/src/graph.ts index 98686d0be..e08569ad5 100644 --- a/modules/sql/src/graph.ts +++ b/modules/sql/src/graph.ts @@ -1,4 +1,4 @@ -// Copyright (c) 2021, NVIDIA CORPORATION. +// Copyright (c) 2021-2022, NVIDIA CORPORATION. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,11 +16,15 @@ import {DataFrame, Table} from '@rapidsai/cudf'; +let nonce = Math.random() * 1e3 | 0; + export class ExecutionGraph { constructor(private _graph?: import('./rapidsai_sql').ExecutionGraph) {} start(): void { this._graph?.start(); } + then() { return this.result(); } + async result() { const {names, tables} = this._graph ? (await this._graph.result()) : {names: [], tables: [new Table({})]}; @@ -33,5 +37,16 @@ export class ExecutionGraph { return results; } - async sendTo(id: number) { return await this.result().then((df) => this._graph?.sendTo(id, df)); } + async sendTo(id: number) { + return await this.result().then((dfs) => { + const {_graph} = this; + const inFlightTables: Record = {}; + if (_graph) { + _graph.sendTo(id, dfs, `${nonce++}`).forEach((messageId, i) => { // + inFlightTables[messageId] = dfs[i]; + }); + } + return inFlightTables; + }); + } } diff --git a/modules/sql/src/rapidsai_sql.ts b/modules/sql/src/rapidsai_sql.ts index 0f6233e21..010dad8e5 100644 --- a/modules/sql/src/rapidsai_sql.ts +++ b/modules/sql/src/rapidsai_sql.ts @@ -14,6 +14,7 @@ import {DataFrame, Table} from '@rapidsai/cudf'; import {ParsedSchema} from './SQLTable'; +import type {defaultContextConfigValues} from './config'; /** @ignore */ export declare const _cpp_exports: any; @@ -37,7 +38,7 @@ export type ContextProps = { ucpContext?: UcpContext; // networkIfaceName: string; // workersUcpInfo: WorkerUcpInfo[]; - configOptions: Record; + configOptions: typeof defaultContextConfigValues; allocationMode: string; initialPoolSize: number | null; maximumPoolSize: number | null; @@ -68,7 +69,7 @@ export declare class ExecutionGraph { start(): void; result(): Promise<{names: string[], tables: Table[]}>; - sendTo(id: number, df: DataFrame[]): string[]; + sendTo(id: number, df: DataFrame[], nonce: string): string[]; } export declare class UcpContext {