Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support streaming in sqlQuery/txSqlQuery #41

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ jobs:
npm run build
- name: Run overview showcase
run: |
docker run -d --rm --health-cmd "immuadmin status" --health-interval 10s --health-timeout 5s --health-retries 5 -p 3322:3322 --name immudb codenotary/immudb:1.4.1
npx ts-node --esm ./immudb-node-showcase/src/overview-showcase.ts
docker run -d --rm --health-cmd "immuadmin status" --health-interval 10s --health-timeout 5s --health-retries 5 -p 3322:3322 --name immudb codenotary/immudb:1.9.3
npx tsx ./immudb-node-showcase/src/overview-showcase.ts
docker stop immudb
- name: Run sql showcase
run: |
docker run -d --rm --health-cmd "immuadmin status" --health-interval 10s --health-timeout 5s --health-retries 5 -p 3322:3322 --name immudb codenotary/immudb:1.4.1
npx ts-node --esm ./immudb-node-showcase/src/sql-showcase.ts
docker run -d --rm --health-cmd "immuadmin status" --health-interval 10s --health-timeout 5s --health-retries 5 -p 3322:3322 --name immudb codenotary/immudb:1.9.3
npx tsx ./immudb-node-showcase/src/sql-showcase.ts
docker stop immudb
- name: Run zSet showcase
run: |
docker run -d --rm --health-cmd "immuadmin status" --health-interval 10s --health-timeout 5s --health-retries 5 -p 3322:3322 --name immudb codenotary/immudb:1.4.1
npx ts-node --esm ./immudb-node-showcase/src/zSet-showcase.ts
docker run -d --rm --health-cmd "immuadmin status" --health-interval 10s --health-timeout 5s --health-retries 5 -p 3322:3322 --name immudb codenotary/immudb:1.9.3
npx tsx ./immudb-node-showcase/src/zSet-showcase.ts
docker stop immudb
5 changes: 3 additions & 2 deletions immudb-node-grpcjs/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ message SQLQueryRequest {
string sql = 1;
repeated NamedParam params = 2;
bool reuseSnapshot = 3;
bool acceptStream = 4;
}

message NamedParam {
Expand Down Expand Up @@ -842,7 +843,7 @@ service ImmuService {
rpc Rollback (google.protobuf.Empty) returns (google.protobuf.Empty){};

rpc TxSQLExec(SQLExecRequest) returns (google.protobuf.Empty) {};
rpc TxSQLQuery(SQLQueryRequest) returns (SQLQueryResult) {};
rpc TxSQLQuery(SQLQueryRequest) returns (stream SQLQueryResult) {};

rpc Login (LoginRequest) returns (LoginResponse){
option deprecated = true;
Expand Down Expand Up @@ -1153,7 +1154,7 @@ service ImmuService {
};
};

rpc SQLQuery(SQLQueryRequest) returns (SQLQueryResult) {
rpc SQLQuery(SQLQueryRequest) returns (stream SQLQueryResult) {
option (google.api.http) = {
post: "/db/sqlquery"
body: "*"
Expand Down
28 changes: 10 additions & 18 deletions immudb-node-grpcjs/src/immudb/schema/ImmuService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,14 +402,10 @@ export interface ImmuServiceClient extends grpc.Client {
sqlExec(argument: _immudb_schema_SQLExecRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLExecResult__Output>): grpc.ClientUnaryCall;
sqlExec(argument: _immudb_schema_SQLExecRequest, callback: grpc.requestCallback<_immudb_schema_SQLExecResult__Output>): grpc.ClientUnaryCall;

SQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
SQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
SQLQuery(argument: _immudb_schema_SQLQueryRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
SQLQuery(argument: _immudb_schema_SQLQueryRequest, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
sqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
sqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
sqlQuery(argument: _immudb_schema_SQLQueryRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
sqlQuery(argument: _immudb_schema_SQLQueryRequest, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
SQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
SQLQuery(argument: _immudb_schema_SQLQueryRequest, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
sqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
sqlQuery(argument: _immudb_schema_SQLQueryRequest, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;

Scan(argument: _immudb_schema_ScanRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_Entries__Output>): grpc.ClientUnaryCall;
Scan(argument: _immudb_schema_ScanRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_Entries__Output>): grpc.ClientUnaryCall;
Expand Down Expand Up @@ -465,14 +461,10 @@ export interface ImmuServiceClient extends grpc.Client {
txSqlExec(argument: _immudb_schema_SQLExecRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_google_protobuf_Empty__Output>): grpc.ClientUnaryCall;
txSqlExec(argument: _immudb_schema_SQLExecRequest, callback: grpc.requestCallback<_google_protobuf_Empty__Output>): grpc.ClientUnaryCall;

TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;

TxScan(argument: _immudb_schema_TxScanRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_TxList__Output>): grpc.ClientUnaryCall;
TxScan(argument: _immudb_schema_TxScanRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_TxList__Output>): grpc.ClientUnaryCall;
Expand Down Expand Up @@ -750,7 +742,7 @@ export interface ImmuServiceHandlers extends grpc.UntypedServiceImplementation {

SQLExec: grpc.handleUnaryCall<_immudb_schema_SQLExecRequest__Output, _immudb_schema_SQLExecResult>;

SQLQuery: grpc.handleUnaryCall<_immudb_schema_SQLQueryRequest__Output, _immudb_schema_SQLQueryResult>;
SQLQuery: grpc.handleServerStreamingCall<_immudb_schema_SQLQueryRequest__Output, _immudb_schema_SQLQueryResult>;

Scan: grpc.handleUnaryCall<_immudb_schema_ScanRequest__Output, _immudb_schema_Entries>;

Expand All @@ -764,7 +756,7 @@ export interface ImmuServiceHandlers extends grpc.UntypedServiceImplementation {

TxSQLExec: grpc.handleUnaryCall<_immudb_schema_SQLExecRequest__Output, _google_protobuf_Empty>;

TxSQLQuery: grpc.handleUnaryCall<_immudb_schema_SQLQueryRequest__Output, _immudb_schema_SQLQueryResult>;
TxSQLQuery: grpc.handleServerStreamingCall<_immudb_schema_SQLQueryRequest__Output, _immudb_schema_SQLQueryResult>;

TxScan: grpc.handleUnaryCall<_immudb_schema_TxScanRequest__Output, _immudb_schema_TxList>;

Expand Down
2 changes: 2 additions & 0 deletions immudb-node-grpcjs/src/immudb/schema/SQLQueryRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ export interface SQLQueryRequest {
'sql'?: (string);
'params'?: (_immudb_schema_NamedParam)[];
'reuseSnapshot'?: (boolean);
'acceptStream'?: (boolean);
}

export interface SQLQueryRequest__Output {
'sql': (string);
'params': (_immudb_schema_NamedParam__Output)[];
'reuseSnapshot': (boolean);
'acceptStream': (boolean);
}
115 changes: 60 additions & 55 deletions immudb-node-showcase/src/overview-showcase.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
import Long from 'long'
import {
Client,
verifyVerification,
types,
stream,
Client,
verifyVerification,
types,
stream,
} from '@codenotary/immudb-node'





overviewSchowcase()
.catch(console.error)
.catch(console.error)


async function overviewSchowcase() {


const client = new Client({
host: '127.0.0.1',
port: 3322,
user: 'immudb',
password: 'immudb',
database: 'defaultdb',
host: '127.0.0.1',
port: 3322,
user: 'immudb',
password: 'immudb',
database: 'defaultdb',
})


Expand All @@ -35,8 +31,8 @@ async function overviewSchowcase() {

const valEntries2 = await client.setValEntries({
kvms: [
{key: Buffer.of(0), val: Buffer.of(0)},
{key: Buffer.of(1), val: Buffer.of(1)},
{ key: Buffer.of(0), val: Buffer.of(0) },
{ key: Buffer.of(1), val: Buffer.of(1) },
]
})
console.log('valEntries2:')
Expand All @@ -45,47 +41,47 @@ async function overviewSchowcase() {

const valEntry3 = await client.setValEntries({
kvms: [
{key: Buffer.of(2), val: Buffer.of(2)},
{ key: Buffer.of(2), val: Buffer.of(2) },
]
})
console.log('valEntry3:')
console.log(valEntry3)


const refEntry4 = await client.setRefEntry({
key: Buffer.of(3),
key: Buffer.of(3),
referToKey: valEntries2.valEntries[0].key,
keyTxId: valEntries2.valEntries[0].id,
boundRef: true,
keyTxId: valEntries2.valEntries[0].id,
boundRef: true,
})
console.log('refEntry4:')
console.log(refEntry4)


const zSetEntry5 = await client.setZSetEntry({
zSet: Buffer.of(4),
referredKey: valEntry3.valEntries[0].key,
referredKeyScore: 3,
zSet: Buffer.of(4),
referredKey: valEntry3.valEntries[0].key,
referredKeyScore: 3,
})
console.log('zSetEntry5:')
console.log(zSetEntry5)

const entries6 = await client.setValRefZSetEntries({
ops: [
{
type: 'val',
key: Buffer.of(2),
val: Buffer.of(6),
type: 'val',
key: Buffer.of(2),
val: Buffer.of(6),
},
{
type: 'ref',
key: Buffer.of(3),
type: 'ref',
key: Buffer.of(3),
referToKey: valEntry3.valEntries[0].key
},
{
type: 'zSet',
type: 'zSet',
referredKey: valEntries2.valEntries[1].key,
zSet: zSetEntry5.zSetTxEntry.zSet,
zSet: zSetEntry5.zSetTxEntry.zSet,
referredKeyScore: 9,
}
]
Expand Down Expand Up @@ -126,7 +122,8 @@ async function overviewSchowcase() {
console.log(stream.toKVEntries(Buffer.concat(buffs)))


const sqlExecCreateTable7 = await client.sqlExec({sql: `
const sqlExecCreateTable7 = await client.sqlExec({
sql: `
create table if not exists testtable (
id1 integer not null,
id2 varchar[3] null,
Expand All @@ -140,7 +137,8 @@ async function overviewSchowcase() {
console.log(sqlExecCreateTable7)


const sqlExecUpsert8 = await client.sqlExec({sql: `
const sqlExecUpsert8 = await client.sqlExec({
sql: `
upsert into testtable
(id1, id2, created, data, isactive)
values
Expand All @@ -153,15 +151,18 @@ async function overviewSchowcase() {


const sqlTxAt8 = await client.executeSqlTx('ReadWrite', async (txApi) => {
const sqlQueryInTxAt8 = await txApi.query({sql: `
const sqlQueryInTxAt8 = await txApi.query({
sql: `
select * from testtable;
`})
console.log('sqlQueryInTxAt8')
console.log(sqlQueryInTxAt8)

for await (const row of sqlQueryInTxAt8) {
console.log(row)
}

// sqlExecUpsert9
const sqlExecUpsertInTx9 = txApi.exec({sql:`
const sqlExecUpsertInTx9 = txApi.exec({
sql: `
upsert into testtable
(id1, id2, created, data, isactive)
values
Expand All @@ -170,11 +171,14 @@ async function overviewSchowcase() {
`})


const sqlQueryInTxAt9 = await txApi.query({sql: `
const sqlQueryInTxAt9 = await txApi.query({
sql: `
select * from testtable;
`})
console.log('sqlQueryInTxAt9')
console.log(sqlQueryInTxAt9)
for await (const row of sqlQueryInTxAt9) {
console.log(row)
}


throw 'I would like to cancel'
Expand All @@ -184,18 +188,19 @@ async function overviewSchowcase() {
console.log(sqlTxAt8)


const sqlQueryAt8 = await client.sqlQuery({sql: `
const sqlQueryAt8 = await client.sqlQuery({
sql: `
select * from testtable;
`})
console.log('sqlQueryInTxAt8')
console.log(sqlQueryAt8)
const k = sqlQueryAt8[0]
const d = k[0]


const row = await sqlQueryAt8.next()
console.log(row)


const dbScanAt8 = await client.scanDbEntries({
scanStartAtTxId: Long.fromValue(1, true),
scanStartAtTxId: Long.fromValue(1, true),
})
console.log('dbScanAt8')
console.log(dbScanAt8)
Expand All @@ -208,12 +213,12 @@ async function overviewSchowcase() {


const setAndProof9 = await client.setValEntriesGetVerification({
kvms: [{key: Buffer.from('yo'), val: Buffer.from('man')}],
kvms: [{ key: Buffer.from('yo'), val: Buffer.from('man') }],
refTxId: stateAt8.txId,
refHash: stateAt8.txHash,
})
console.log('setAndProof9')
console.dir(setAndProof9, {depth: 10})
console.dir(setAndProof9, { depth: 10 })

console.log('verifyVerification(setAndProof9) result:')
console.log(verifyVerification(setAndProof9.verification))
Expand All @@ -232,7 +237,7 @@ async function overviewSchowcase() {
refTxId: stateAt9.txId,
})
console.log('getTx2AndVerification')
console.log(getTx2AndVerification, {depth: 10})
console.log(getTx2AndVerification, { depth: 10 })
console.log('verifyVerification(getTx2AndVerification) result:')
console.log(verifyVerification(getTx2AndVerification.verification))

Expand All @@ -241,40 +246,40 @@ async function overviewSchowcase() {

// entries6
const getTx6AndVerification = await client.getTxAndVerification({
txId: entries6.tx.id,
txId: entries6.tx.id,
refHash: stateAt9.txHash,
refTxId: stateAt9.txId,
})
console.log('getTx6AndVerification')
console.log(getTx6AndVerification, {depth: 10})
console.log(getTx6AndVerification, { depth: 10 })
console.log('verifyVerification(getTx6AndVerification) result:')
console.log(verifyVerification(getTx6AndVerification.verification))



// sqlExecCreateTable7.subTxes[0].tx?.id
const getTx7AndVerification = await client.getTxAndVerification({
txId: Long.fromInt(7, true),
txId: Long.fromInt(7, true),
refHash: stateAt9.txHash,
refTxId: stateAt9.txId,
})
console.log('getTx7AndVerification')
console.log(getTx7AndVerification, {depth: 10})
console.log(getTx7AndVerification, { depth: 10 })
console.log('verifyVerification(getTx7AndVerification) result:')
console.log(verifyVerification(getTx7AndVerification.verification))


// sqlExecUpsert8
const getTx8AndVerification = await client.getTxAndVerification({
txId: Long.fromInt(8, true),
txId: Long.fromInt(8, true),
refHash: stateAt9.txHash,
refTxId: stateAt9.txId,
})
console.log('getTx8AndVerification')
console.log(getTx8AndVerification, {depth: 10})
console.log(getTx8AndVerification, { depth: 10 })
console.log('verifyVerification(getTx8AndVerification) result:')
console.log(verifyVerification(getTx8AndVerification.verification))




Expand Down
Loading
Loading