From f358ae4bddfc4e472221dd3baf1349b9e86f6f1e Mon Sep 17 00:00:00 2001 From: Stefano Scafiti Date: Mon, 20 May 2024 10:45:44 +0200 Subject: [PATCH] Support stream in SQLQuery/TxSQLQuery --- .../io/codenotary/immudb4j/ImmuClient.java | 3 ++- .../immudb4j/sql/SQLQueryResult.java | 26 +++++++++++++------ src/main/proto/schema.proto | 4 +-- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/src/main/java/io/codenotary/immudb4j/ImmuClient.java b/src/main/java/io/codenotary/immudb4j/ImmuClient.java index dbf508b..1ef147b 100644 --- a/src/main/java/io/codenotary/immudb4j/ImmuClient.java +++ b/src/main/java/io/codenotary/immudb4j/ImmuClient.java @@ -371,7 +371,8 @@ public synchronized SQLQueryResult sqlQuery(String stmt, Map p .addAllParams(sqlEncodeParams(params)) .build(); - return new SQLQueryResult(blockingStub.txSQLQuery(req)); + Iterator it = blockingStub.txSQLQuery(req); + return new SQLQueryResult(it); } private Map sqlNameParams(SQLValue... params) { diff --git a/src/main/java/io/codenotary/immudb4j/sql/SQLQueryResult.java b/src/main/java/io/codenotary/immudb4j/sql/SQLQueryResult.java index 3348d11..8d01cc9 100644 --- a/src/main/java/io/codenotary/immudb4j/sql/SQLQueryResult.java +++ b/src/main/java/io/codenotary/immudb4j/sql/SQLQueryResult.java @@ -17,23 +17,27 @@ package io.codenotary.immudb4j.sql; import java.util.Date; +import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; +import java.util.Iterator; import io.codenotary.immudb.ImmudbProto; public class SQLQueryResult { - + + private final Iterator it; private ImmudbProto.SQLQueryResult res; private int currRow = -1; private boolean closed; - public SQLQueryResult(ImmudbProto.SQLQueryResult res) { - if (res == null) { + public SQLQueryResult(Iterator it) { + if (it == null) { throw new RuntimeException("illegal arguments"); } - this.res = res; + this.it = it; + this.res = it.next(); } public synchronized void close() throws SQLException { @@ -45,12 +49,18 @@ public synchronized boolean next() throws SQLException { throw new SQLException("already closed"); } - if (currRow + 1 >= res.getRowsCount()) { + if (res != null && currRow+1 < res.getRowsCount()) { + currRow++; + return true; + } + + try { + res = this.it.next(); + } catch (NoSuchElementException e) { return false; } + currRow = 0; - currRow++; - return true; } @@ -76,7 +86,7 @@ public synchronized int getColumnsCount() throws SQLException { if (closed) { throw new SQLException("already closed"); } - + return res.getColumnsCount(); } diff --git a/src/main/proto/schema.proto b/src/main/proto/schema.proto index 26d7828..def0eff 100644 --- a/src/main/proto/schema.proto +++ b/src/main/proto/schema.proto @@ -783,7 +783,7 @@ service ImmuService { rpc Rollback (google.protobuf.Empty) returns (google.protobuf.Empty){}; rpc TxSQLExec(SQLExecRequest) returns (google.protobuf.Empty) {}; - rpc TxSQLQuery(SQLQueryRequest) returns (SQLQueryResult) {}; + rpc TxSQLQuery(SQLQueryRequest) returns (stream SQLQueryResult) {}; rpc Set (SetRequest) returns (TxHeader){ }; @@ -894,7 +894,7 @@ service ImmuService { rpc SQLExec(SQLExecRequest) returns (SQLExecResult) { }; - rpc SQLQuery(SQLQueryRequest) returns (SQLQueryResult) { + rpc SQLQuery(SQLQueryRequest) returns (stream SQLQueryResult) { }; rpc ListTables(google.protobuf.Empty) returns (SQLQueryResult) {