Skip to content

Commit

Permalink
[FIX] install @rapidsai/sql module's .jar files into runtime imag…
Browse files Browse the repository at this point in the history
…es (#431)

* install sql .jar files into runtime images

* default to TCP until we can figure out UCX issues

* update types

* make cluster workers retain their in-flight tables until they're received by the main proc, then dispose immediately after to avoid OOM's

* add a nonce to help ensure worker in-flight message names don't conflict

* add logging to the sql demo, fix a UI bug
  • Loading branch information
trxcllnt authored Sep 16, 2022
1 parent a7b0c30 commit a92b35d
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 60 deletions.
9 changes: 7 additions & 2 deletions dev/dockerfiles/runtime/demo.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ RUN --mount=type=bind,from=build,source=/opt/rapids/,target=/tmp/rapids/ \
-f /tmp/rapids/rapidsai_${x}-*-Linux.tar.gz \
--wildcards --strip-components=2 \
-x "**/lib/rapidsai_${x}.node" ; \
done
done; \
tar -C node_modules/@rapidsai/sql/build/Release \
-f /tmp/rapids/rapidsai_sql-*.tar.gz \
--wildcards --strip-components=2 \
-x "*/blazingsql-*.jar" ;


FROM scratch as ucx-deb-amd64

Expand Down Expand Up @@ -66,7 +71,7 @@ RUN --mount=type=bind,from=ucx-deb,target=/usr/src/ucx \
# GLEW dependencies
libglvnd0 libgl1 libglx0 libegl1 libgles2 libglu1-mesa \
# UCX runtime dependencies
libibverbs1 librdmacm1 libnuma1 \
libibverbs1 librdmacm1 libnuma1 numactl \
# node-canvas dependencies
libcairo2 libpango-1.0-0 libpangocairo-1.0-0 libjpeg8 libgif7 librsvg2-2 \
# SQL dependencies
Expand Down
8 changes: 6 additions & 2 deletions dev/dockerfiles/runtime/main.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ RUN --mount=type=bind,from=build,source=/opt/rapids/,target=/tmp/rapids/ \
-f /tmp/rapids/rapidsai_${x}-*-Linux.tar.gz \
--wildcards --strip-components=2 \
-x "**/lib/rapidsai_${x}.node" ; \
done
done; \
tar -C node_modules/@rapidsai/sql/build/Release \
-f /tmp/rapids/rapidsai_sql-*.tar.gz \
--wildcards --strip-components=2 \
-x "*/blazingsql-*.jar" ;

FROM scratch as ucx-deb-amd64

Expand Down Expand Up @@ -65,7 +69,7 @@ RUN --mount=type=bind,from=ucx-deb,target=/usr/src/ucx \
# GLEW dependencies
libglvnd0 libgl1 libglx0 libegl1 libgles2 libglu1-mesa \
# UCX runtime dependencies
libibverbs1 librdmacm1 libnuma1 \
libibverbs1 librdmacm1 libnuma1 numactl \
# node-canvas dependencies
libcairo2 libpango-1.0-0 libpangocairo-1.0-0 libjpeg8 libgif7 librsvg2-2 \
# SQL dependencies
Expand Down
8 changes: 6 additions & 2 deletions dev/dockerfiles/runtime/sql.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ RUN --mount=type=bind,from=build,source=/opt/rapids/,target=/tmp/rapids/ \
-f /tmp/rapids/rapidsai_${x}-*-Linux.tar.gz \
--wildcards --strip-components=2 \
-x "**/lib/rapidsai_${x}.node" ; \
done
done; \
tar -C node_modules/@rapidsai/sql/build/Release \
-f /tmp/rapids/rapidsai_sql-*.tar.gz \
--wildcards --strip-components=2 \
-x "*/blazingsql-*.jar" ;

FROM scratch as ucx-deb-amd64

Expand All @@ -46,7 +50,7 @@ RUN --mount=type=bind,from=ucx-deb,target=/usr/src/ucx \
&& apt update \
&& apt install -y --no-install-recommends \
# UCX runtime dependencies
libibverbs1 librdmacm1 libnuma1 \
libibverbs1 librdmacm1 libnuma1 numactl \
# SQL dependencies
openjdk-8-jre-headless libboost-regex-dev libboost-system-dev libboost-filesystem-dev \
# Install UCX
Expand Down
24 changes: 14 additions & 10 deletions modules/demo/sql/sql-cluster-server/components/querydashboard.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,30 @@ const columns = [
{ id: 'text', label: 'Text', minWidth: 500 }
];

/**
*
* @param {import('apache-arrow').Table} table
*/
function formatData(table) {
let rows = [];
if (table.length == 0) {
return rows;
}

const resultsToDisplay = table.length < MAX_RESULTS_TO_DISPLAY ? table.length : MAX_RESULTS_TO_DISPLAY;
const ids = [...table.getColumn("id")].map((x) => +x).slice(0, resultsToDisplay);
const revids = [...table.getColumn("revid")].map((x) => +x).slice(0, resultsToDisplay);
const urls = [...table.getColumn("url")].slice(0, resultsToDisplay);
const titles = [...table.getColumn("title")].slice(0, resultsToDisplay);
const texts = [...table.getColumn("text")].slice(0, resultsToDisplay);
const ids = [...table.getChild("id") || []].map(Number).slice(0, resultsToDisplay);
const revids = [...table.getChild("revid") || []].map(Number).slice(0, resultsToDisplay);
const urls = [...table.getChild("url") || []].slice(0, resultsToDisplay);
const titles = [...table.getChild("title") || []].slice(0, resultsToDisplay);
const texts = [...table.getChild("text") || []].slice(0, resultsToDisplay);

for (let i = 0; i < resultsToDisplay; ++i) {
rows.push({
id: ids[i],
revid: revids[i],
url: urls[i],
title: titles[i],
text: texts[i]
id: ids[i] || '',
revid: revids[i] || '',
url: urls[i] || '',
title: titles[i] || '',
text: texts[i] || '',
});
}

Expand Down
72 changes: 43 additions & 29 deletions modules/demo/sql/sql-cluster-server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

const fs = require('fs');
const {performance} = require('perf_hooks');
const {SQLCluster} = require('@rapidsai/sql');
const {DataFrame} = require('@rapidsai/cudf');
const {RecordBatchStreamWriter} = require('apache-arrow');
const fs = require('fs');
const {SQLCluster} = require('@rapidsai/sql');
const {DataFrame, scope} = require('@rapidsai/cudf');

const fastify = require('fastify')({
pluginTimeout: 30000,
Expand Down Expand Up @@ -48,7 +48,24 @@ fastify.register((require('fastify-arrow')))
dev: process.env.NODE_ENV !== 'production',
}))
.register(async (instance, opts, done) => {
sqlCluster = await SQLCluster.init({numWorkers: 10});
const logPath = `${__dirname}/.logs`;
require('rimraf').sync(logPath);
fs.mkdirSync(logPath, {recursive: true});

sqlCluster = await SQLCluster.init({
numWorkers: Infinity,
enableLogging: true,
// allocationMode: 'pool_memory_resource',
configOptions: {
PROTOCOL: 'TCP',
ENABLE_TASK_LOGS: true,
ENABLE_COMMS_LOGS: true,
ENABLE_OTHER_ENGINE_LOGS: true,
ENABLE_GENERAL_ENGINE_LOGS: true,
BLAZING_LOGGING_DIRECTORY: logPath,
BLAZING_LOCAL_LOGGING_DIRECTORY: logPath,
}
});
await sqlCluster.createCSVTable('test_table', DATA_PATHS);
done();
})
Expand All @@ -57,23 +74,21 @@ fastify.register((require('fastify-arrow')))
fastify.post('/run_query', async function(request, reply) {
try {
request.log.info({query: request.body}, `calling sqlCluster.sql()`);
const t0 = performance.now();
const dfs = await sqlCluster.sql(request.body).catch((err) => {
request.log.error({err}, `Error calling sqlCluster.sql`);
return new DataFrame();
});
const t1 = performance.now();
const queryTime = t1 - t0;

const {results, resultCount} = head(dfs, 500);
const arrowTable = results.toArrow();
arrowTable.schema.metadata.set('queryTime', queryTime);
arrowTable.schema.metadata.set('queryResults', resultCount);
RecordBatchStreamWriter.writeAll(arrowTable).pipe(reply.stream());
await scope(async () => {
const t0 = performance.now();
const dfs = await sqlCluster.sql(request.body).catch((err) => {
request.log.error({err}, `Error calling sqlCluster.sql`);
return new DataFrame();
});
const t1 = performance.now();
const queryTime = t1 - t0;

// TODO: remove these calls to dispose once scope() supports async
results.dispose();
dfs.forEach((df) => df.dispose());
const {result, rowCount} = head(dfs, 500);
const arrowTable = result.toArrow();
arrowTable.schema.metadata.set('queryTime', queryTime);
arrowTable.schema.metadata.set('queryResults', rowCount);
RecordBatchStreamWriter.writeAll(arrowTable).pipe(reply.stream());
});
} catch (err) {
request.log.error({err}, '/run_query error');
reply.code(500).send(err);
Expand All @@ -91,16 +106,15 @@ function head(dfs, rows) {
let rowCount = 0;

for (let i = 0; i < dfs.length; ++i) {
if (dfs[i].numRows == 0) continue;
rowCount += dfs[i].numRows;
if (result.numRows <= rows) {
const head = dfs[i].head(rows - result.numRows);
result = result.concat(head);

// TODO: remove this call to dispose once scope() supports async
head.dispose();
if (dfs[i].numRows > 0) {
rowCount += dfs[i].numRows;
if (result.numRows <= rows) {
result = scope(() => { //
return result.concat(dfs[i].head(rows - result.numRows));
});
}
}
}

return {results: result, resultCount: rowCount};
return {result, rowCount};
}
6 changes: 4 additions & 2 deletions modules/sql/blazingsql/graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ Napi::Value ExecutionGraph::send(Napi::CallbackInfo const& info) {

int32_t dst_ral_id = args[0];
Napi::Array data_frames = args[1];
int32_t nonce = args[2];

auto messages = Napi::Array::New(env, data_frames.Length());
for (int i = 0; i < data_frames.Length(); ++i) {
Expand All @@ -97,8 +98,9 @@ Napi::Value ExecutionGraph::send(Napi::CallbackInfo const& info) {
auto ctx_token =
std::to_string(_graph->get_last_kernel()->input_cache()->get_context()->getContextToken());

std::string message = "broadcast_table_message_" + std::to_string(i);
messages[i] = message;
std::string message =
"broadcast_table_message_" + std::to_string(nonce) + "_" + std::to_string(i);
messages[i] = message;

_context.Value()->send(dst_ral_id, ctx_token, message, names, *table);
}
Expand Down
6 changes: 3 additions & 3 deletions modules/sql/src/cluster.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021, NVIDIA CORPORATION.
// Copyright (c) 2021-2022, NVIDIA CORPORATION.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,7 +33,7 @@ export interface Worker {
createContext(props: Omit<ContextProps, 'id'>): Promise<void>;
}

export interface ClusterProps {
export interface ClusterProps extends ContextProps {
ip: string;
port: number;
numWorkers: number;
Expand All @@ -54,7 +54,7 @@ export class SQLCluster {
* const cluster = await Cluster.init();
* ```
*/
public static async init(options: Partial<ClusterProps>&Partial<ContextProps> = {}) {
public static async init(options: Partial<ClusterProps> = {}) {
const {numWorkers = Device.numDevices, ip = '0.0.0.0', port = 4000} = options;
const {
networkIfaceName = 'lo',
Expand Down
11 changes: 9 additions & 2 deletions modules/sql/src/cluster/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,15 @@ export class RemoteSQLWorker implements Worker {

public sql(query: string, token: number) {
return this._send({type: 'sql', query, token, destinationId: this._cluster.context.id})
.then(({messageIds}: {messageIds: string[]}) =>
Promise.all(messageIds.map((id: string) => this._cluster.context.pull(id))));
.then(({messageIds}: {messageIds: string[]}) => {
return Promise.all(messageIds.map((messageId: string) => {
return this._cluster.context.pull(messageId).then((df) => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this._send({type: 'release', messageId});
return df;
});
}));
});
}

private _send({type, ...rest}: any = {}) {
Expand Down
16 changes: 15 additions & 1 deletion modules/sql/src/cluster/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import {DataFrame} from '@rapidsai/cudf';

import {ContextProps, UcpContext} from '../addon';
import {SQLContext} from '../context';

let context: SQLContext;
const allInFlightTables: Record<string, DataFrame> = {};

function die({code = 0}: any) { process.exit(code); }

Expand Down Expand Up @@ -45,7 +48,17 @@ function createORCTable({name, paths}: {name: string, paths: string[]}) {

async function sql({query, token, destinationId}:
{uuid: string, query: string, token: number; destinationId: number}) {
return {messageIds: await context.sql(query, token).sendTo(destinationId)};
const newInFlightTables = await context.sql(query, token).sendTo(destinationId);
Object.assign(allInFlightTables, newInFlightTables);
return {messageIds: Object.keys(newInFlightTables)};
}

function release({messageId}: {messageId: string}) {
const df = allInFlightTables[messageId];
if (df) {
delete allInFlightTables[messageId];
df.dispose();
}
}

process.on('message', ({type, ...opts}: any) => {
Expand All @@ -55,6 +68,7 @@ process.on('message', ({type, ...opts}: any) => {
case 'kill': return die(opts);
case 'init': return init(opts);
case 'sql': return await sql(opts);
case 'release': return release(opts);
case 'dropTable': return dropTable(opts);
case 'createDataFrameTable': return await createDataFrameTable(opts);
case 'createCSVTable': return createCSVTable(opts);
Expand Down
4 changes: 2 additions & 2 deletions modules/sql/src/config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021, NVIDIA CORPORATION.
// Copyright (c) 2021-2022, NVIDIA CORPORATION.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,5 +48,5 @@ export const defaultContextConfigValues = {

export const defaultClusterConfigValues = {
...defaultContextConfigValues,
PROTOCOL: 'UCX'
PROTOCOL: 'TCP'
};
2 changes: 1 addition & 1 deletion modules/sql/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export class SQLContext {
declare private _schema: any;
declare private _generator: any;
declare private _tables: Map<string, SQLTable>;
declare private _configOptions: Record<string, unknown>;
declare private _configOptions: typeof defaultContextConfigValues;

constructor(options: Partial<ContextProps> = {}) {
this._db = CatalogDatabaseImpl('main');
Expand Down
19 changes: 17 additions & 2 deletions modules/sql/src/graph.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021, NVIDIA CORPORATION.
// Copyright (c) 2021-2022, NVIDIA CORPORATION.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,11 +16,15 @@

import {DataFrame, Table} from '@rapidsai/cudf';

let nonce = Math.random() * 1e3 | 0;

export class ExecutionGraph {
constructor(private _graph?: import('./rapidsai_sql').ExecutionGraph) {}

start(): void { this._graph?.start(); }

then() { return this.result(); }

async result() {
const {names, tables} =
this._graph ? (await this._graph.result()) : {names: [], tables: [new Table({})]};
Expand All @@ -33,5 +37,16 @@ export class ExecutionGraph {
return results;
}

async sendTo(id: number) { return await this.result().then((df) => this._graph?.sendTo(id, df)); }
async sendTo(id: number) {
return await this.result().then((dfs) => {
const {_graph} = this;
const inFlightTables: Record<string, DataFrame> = {};
if (_graph) {
_graph.sendTo(id, dfs, `${nonce++}`).forEach((messageId, i) => { //
inFlightTables[messageId] = dfs[i];
});
}
return inFlightTables;
});
}
}
Loading

0 comments on commit a92b35d

Please sign in to comment.