Skip to content

Commit

Permalink
Support streaming in sqlQuery/txSqlQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
ostafen committed May 21, 2024
1 parent 125cdcf commit bda7d90
Show file tree
Hide file tree
Showing 13 changed files with 555 additions and 480 deletions.
5 changes: 3 additions & 2 deletions immudb-node-grpcjs/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ message SQLQueryRequest {
string sql = 1;
repeated NamedParam params = 2;
bool reuseSnapshot = 3;
bool acceptStream = 4;
}

message NamedParam {
Expand Down Expand Up @@ -842,7 +843,7 @@ service ImmuService {
rpc Rollback (google.protobuf.Empty) returns (google.protobuf.Empty){};

rpc TxSQLExec(SQLExecRequest) returns (google.protobuf.Empty) {};
rpc TxSQLQuery(SQLQueryRequest) returns (SQLQueryResult) {};
rpc TxSQLQuery(SQLQueryRequest) returns (stream SQLQueryResult) {};

rpc Login (LoginRequest) returns (LoginResponse){
option deprecated = true;
Expand Down Expand Up @@ -1153,7 +1154,7 @@ service ImmuService {
};
};

rpc SQLQuery(SQLQueryRequest) returns (SQLQueryResult) {
rpc SQLQuery(SQLQueryRequest) returns (stream SQLQueryResult) {
option (google.api.http) = {
post: "/db/sqlquery"
body: "*"
Expand Down
28 changes: 10 additions & 18 deletions immudb-node-grpcjs/src/immudb/schema/ImmuService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,14 +402,10 @@ export interface ImmuServiceClient extends grpc.Client {
sqlExec(argument: _immudb_schema_SQLExecRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLExecResult__Output>): grpc.ClientUnaryCall;
sqlExec(argument: _immudb_schema_SQLExecRequest, callback: grpc.requestCallback<_immudb_schema_SQLExecResult__Output>): grpc.ClientUnaryCall;

SQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
SQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
SQLQuery(argument: _immudb_schema_SQLQueryRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
SQLQuery(argument: _immudb_schema_SQLQueryRequest, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
sqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
sqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
sqlQuery(argument: _immudb_schema_SQLQueryRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
sqlQuery(argument: _immudb_schema_SQLQueryRequest, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
SQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
SQLQuery(argument: _immudb_schema_SQLQueryRequest, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
sqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
sqlQuery(argument: _immudb_schema_SQLQueryRequest, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;

Scan(argument: _immudb_schema_ScanRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_Entries__Output>): grpc.ClientUnaryCall;
Scan(argument: _immudb_schema_ScanRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_Entries__Output>): grpc.ClientUnaryCall;
Expand Down Expand Up @@ -465,14 +461,10 @@ export interface ImmuServiceClient extends grpc.Client {
txSqlExec(argument: _immudb_schema_SQLExecRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_google_protobuf_Empty__Output>): grpc.ClientUnaryCall;
txSqlExec(argument: _immudb_schema_SQLExecRequest, callback: grpc.requestCallback<_google_protobuf_Empty__Output>): grpc.ClientUnaryCall;

TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;

TxScan(argument: _immudb_schema_TxScanRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_TxList__Output>): grpc.ClientUnaryCall;
TxScan(argument: _immudb_schema_TxScanRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_TxList__Output>): grpc.ClientUnaryCall;
Expand Down Expand Up @@ -750,7 +742,7 @@ export interface ImmuServiceHandlers extends grpc.UntypedServiceImplementation {

SQLExec: grpc.handleUnaryCall<_immudb_schema_SQLExecRequest__Output, _immudb_schema_SQLExecResult>;

SQLQuery: grpc.handleUnaryCall<_immudb_schema_SQLQueryRequest__Output, _immudb_schema_SQLQueryResult>;
SQLQuery: grpc.handleServerStreamingCall<_immudb_schema_SQLQueryRequest__Output, _immudb_schema_SQLQueryResult>;

Scan: grpc.handleUnaryCall<_immudb_schema_ScanRequest__Output, _immudb_schema_Entries>;

Expand All @@ -764,7 +756,7 @@ export interface ImmuServiceHandlers extends grpc.UntypedServiceImplementation {

TxSQLExec: grpc.handleUnaryCall<_immudb_schema_SQLExecRequest__Output, _google_protobuf_Empty>;

TxSQLQuery: grpc.handleUnaryCall<_immudb_schema_SQLQueryRequest__Output, _immudb_schema_SQLQueryResult>;
TxSQLQuery: grpc.handleServerStreamingCall<_immudb_schema_SQLQueryRequest__Output, _immudb_schema_SQLQueryResult>;

TxScan: grpc.handleUnaryCall<_immudb_schema_TxScanRequest__Output, _immudb_schema_TxList>;

Expand Down
2 changes: 2 additions & 0 deletions immudb-node-grpcjs/src/immudb/schema/SQLQueryRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ export interface SQLQueryRequest {
'sql'?: (string);
'params'?: (_immudb_schema_NamedParam)[];
'reuseSnapshot'?: (boolean);
'acceptStream'?: (boolean);
}

export interface SQLQueryRequest__Output {
'sql': (string);
'params': (_immudb_schema_NamedParam__Output)[];
'reuseSnapshot': (boolean);
'acceptStream': (boolean);
}
102 changes: 54 additions & 48 deletions immudb-node-showcase/src/overview-showcase.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
import Long from 'long'
import {
Client,
verifyVerification,
types,
stream,
Client,
verifyVerification,
types,
stream,
} from '@codenotary/immudb-node'





overviewSchowcase()
.catch(console.error)
.catch(console.error)


async function overviewSchowcase() {


const client = new Client({
host: '127.0.0.1',
port: 3322,
user: 'immudb',
password: 'immudb',
database: 'defaultdb',
host: '127.0.0.1',
port: 3322,
user: 'immudb',
password: 'immudb',
database: 'defaultdb',
})


Expand All @@ -35,8 +35,8 @@ async function overviewSchowcase() {

const valEntries2 = await client.setValEntries({
kvms: [
{key: Buffer.of(0), val: Buffer.of(0)},
{key: Buffer.of(1), val: Buffer.of(1)},
{ key: Buffer.of(0), val: Buffer.of(0) },
{ key: Buffer.of(1), val: Buffer.of(1) },
]
})
console.log('valEntries2:')
Expand All @@ -45,47 +45,47 @@ async function overviewSchowcase() {

const valEntry3 = await client.setValEntries({
kvms: [
{key: Buffer.of(2), val: Buffer.of(2)},
{ key: Buffer.of(2), val: Buffer.of(2) },
]
})
console.log('valEntry3:')
console.log(valEntry3)


const refEntry4 = await client.setRefEntry({
key: Buffer.of(3),
key: Buffer.of(3),
referToKey: valEntries2.valEntries[0].key,
keyTxId: valEntries2.valEntries[0].id,
boundRef: true,
keyTxId: valEntries2.valEntries[0].id,
boundRef: true,
})
console.log('refEntry4:')
console.log(refEntry4)


const zSetEntry5 = await client.setZSetEntry({
zSet: Buffer.of(4),
referredKey: valEntry3.valEntries[0].key,
referredKeyScore: 3,
zSet: Buffer.of(4),
referredKey: valEntry3.valEntries[0].key,
referredKeyScore: 3,
})
console.log('zSetEntry5:')
console.log(zSetEntry5)

const entries6 = await client.setValRefZSetEntries({
ops: [
{
type: 'val',
key: Buffer.of(2),
val: Buffer.of(6),
type: 'val',
key: Buffer.of(2),
val: Buffer.of(6),
},
{
type: 'ref',
key: Buffer.of(3),
type: 'ref',
key: Buffer.of(3),
referToKey: valEntry3.valEntries[0].key
},
{
type: 'zSet',
type: 'zSet',
referredKey: valEntries2.valEntries[1].key,
zSet: zSetEntry5.zSetTxEntry.zSet,
zSet: zSetEntry5.zSetTxEntry.zSet,
referredKeyScore: 9,
}
]
Expand Down Expand Up @@ -126,7 +126,8 @@ async function overviewSchowcase() {
console.log(stream.toKVEntries(Buffer.concat(buffs)))


const sqlExecCreateTable7 = await client.sqlExec({sql: `
const sqlExecCreateTable7 = await client.sqlExec({
sql: `
create table if not exists testtable (
id1 integer not null,
id2 varchar[3] null,
Expand All @@ -140,7 +141,8 @@ async function overviewSchowcase() {
console.log(sqlExecCreateTable7)


const sqlExecUpsert8 = await client.sqlExec({sql: `
const sqlExecUpsert8 = await client.sqlExec({
sql: `
upsert into testtable
(id1, id2, created, data, isactive)
values
Expand All @@ -153,15 +155,17 @@ async function overviewSchowcase() {


const sqlTxAt8 = await client.executeSqlTx('ReadWrite', async (txApi) => {
const sqlQueryInTxAt8 = await txApi.query({sql: `
const sqlQueryInTxAt8 = await txApi.query({
sql: `
select * from testtable;
`})
console.log('sqlQueryInTxAt8')
console.log(sqlQueryInTxAt8)


// sqlExecUpsert9
const sqlExecUpsertInTx9 = txApi.exec({sql:`
const sqlExecUpsertInTx9 = txApi.exec({
sql: `
upsert into testtable
(id1, id2, created, data, isactive)
values
Expand All @@ -170,7 +174,8 @@ async function overviewSchowcase() {
`})


const sqlQueryInTxAt9 = await txApi.query({sql: `
const sqlQueryInTxAt9 = await txApi.query({
sql: `
select * from testtable;
`})
console.log('sqlQueryInTxAt9')
Expand All @@ -184,18 +189,19 @@ async function overviewSchowcase() {
console.log(sqlTxAt8)


const sqlQueryAt8 = await client.sqlQuery({sql: `
const sqlQueryAt8 = await client.sqlQuery({
sql: `
select * from testtable;
`})
console.log('sqlQueryInTxAt8')
console.log(sqlQueryAt8)
const k = sqlQueryAt8[0]
const d = k[0]


const row = await sqlQueryAt8.next()
console.log(row)


const dbScanAt8 = await client.scanDbEntries({
scanStartAtTxId: Long.fromValue(1, true),
scanStartAtTxId: Long.fromValue(1, true),
})
console.log('dbScanAt8')
console.log(dbScanAt8)
Expand All @@ -208,12 +214,12 @@ async function overviewSchowcase() {


const setAndProof9 = await client.setValEntriesGetVerification({
kvms: [{key: Buffer.from('yo'), val: Buffer.from('man')}],
kvms: [{ key: Buffer.from('yo'), val: Buffer.from('man') }],
refTxId: stateAt8.txId,
refHash: stateAt8.txHash,
})
console.log('setAndProof9')
console.dir(setAndProof9, {depth: 10})
console.dir(setAndProof9, { depth: 10 })

console.log('verifyVerification(setAndProof9) result:')
console.log(verifyVerification(setAndProof9.verification))
Expand All @@ -232,7 +238,7 @@ async function overviewSchowcase() {
refTxId: stateAt9.txId,
})
console.log('getTx2AndVerification')
console.log(getTx2AndVerification, {depth: 10})
console.log(getTx2AndVerification, { depth: 10 })
console.log('verifyVerification(getTx2AndVerification) result:')
console.log(verifyVerification(getTx2AndVerification.verification))

Expand All @@ -241,40 +247,40 @@ async function overviewSchowcase() {

// entries6
const getTx6AndVerification = await client.getTxAndVerification({
txId: entries6.tx.id,
txId: entries6.tx.id,
refHash: stateAt9.txHash,
refTxId: stateAt9.txId,
})
console.log('getTx6AndVerification')
console.log(getTx6AndVerification, {depth: 10})
console.log(getTx6AndVerification, { depth: 10 })
console.log('verifyVerification(getTx6AndVerification) result:')
console.log(verifyVerification(getTx6AndVerification.verification))



// sqlExecCreateTable7.subTxes[0].tx?.id
const getTx7AndVerification = await client.getTxAndVerification({
txId: Long.fromInt(7, true),
txId: Long.fromInt(7, true),
refHash: stateAt9.txHash,
refTxId: stateAt9.txId,
})
console.log('getTx7AndVerification')
console.log(getTx7AndVerification, {depth: 10})
console.log(getTx7AndVerification, { depth: 10 })
console.log('verifyVerification(getTx7AndVerification) result:')
console.log(verifyVerification(getTx7AndVerification.verification))


// sqlExecUpsert8
const getTx8AndVerification = await client.getTxAndVerification({
txId: Long.fromInt(8, true),
txId: Long.fromInt(8, true),
refHash: stateAt9.txHash,
refTxId: stateAt9.txId,
})
console.log('getTx8AndVerification')
console.log(getTx8AndVerification, {depth: 10})
console.log(getTx8AndVerification, { depth: 10 })
console.log('verifyVerification(getTx8AndVerification) result:')
console.log(verifyVerification(getTx8AndVerification.verification))




Expand Down
Loading

0 comments on commit bda7d90

Please sign in to comment.