diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml index 539d701..ff54fc1 100644 --- a/.github/workflows/gradle.yml +++ b/.github/workflows/gradle.yml @@ -21,7 +21,7 @@ jobs: with: java-version: 1.8 - name: Start immudb container - run: docker run -d --health-cmd "immuadmin status" --health-interval 10s --health-timeout 5s --health-retries 5 -p 3322:3322 codenotary/immudb:1.4.0 + run: docker run -d --health-cmd "immuadmin status" --health-interval 10s --health-timeout 5s --health-retries 5 -p 3322:3322 codenotary/immudb:1.9.4 - name: Grant execute permission for gradlew run: chmod +x gradlew - name: Build with Gradle diff --git a/src/main/java/io/codenotary/immudb4j/ImmuClient.java b/src/main/java/io/codenotary/immudb4j/ImmuClient.java index dbf508b..f215f7e 100644 --- a/src/main/java/io/codenotary/immudb4j/ImmuClient.java +++ b/src/main/java/io/codenotary/immudb4j/ImmuClient.java @@ -369,9 +369,11 @@ public synchronized SQLQueryResult sqlQuery(String stmt, Map p final ImmudbProto.SQLQueryRequest req = ImmudbProto.SQLQueryRequest.newBuilder() .setSql(stmt) .addAllParams(sqlEncodeParams(params)) + .setAcceptStream(true) .build(); - return new SQLQueryResult(blockingStub.txSQLQuery(req)); + Iterator it = blockingStub.txSQLQuery(req); + return new SQLQueryResult(it); } private Map sqlNameParams(SQLValue... params) { diff --git a/src/main/java/io/codenotary/immudb4j/sql/SQLQueryResult.java b/src/main/java/io/codenotary/immudb4j/sql/SQLQueryResult.java index 3348d11..8d01cc9 100644 --- a/src/main/java/io/codenotary/immudb4j/sql/SQLQueryResult.java +++ b/src/main/java/io/codenotary/immudb4j/sql/SQLQueryResult.java @@ -17,23 +17,27 @@ package io.codenotary.immudb4j.sql; import java.util.Date; +import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; +import java.util.Iterator; import io.codenotary.immudb.ImmudbProto; public class SQLQueryResult { - + + private final Iterator it; private ImmudbProto.SQLQueryResult res; private int currRow = -1; private boolean closed; - public SQLQueryResult(ImmudbProto.SQLQueryResult res) { - if (res == null) { + public SQLQueryResult(Iterator it) { + if (it == null) { throw new RuntimeException("illegal arguments"); } - this.res = res; + this.it = it; + this.res = it.next(); } public synchronized void close() throws SQLException { @@ -45,12 +49,18 @@ public synchronized boolean next() throws SQLException { throw new SQLException("already closed"); } - if (currRow + 1 >= res.getRowsCount()) { + if (res != null && currRow+1 < res.getRowsCount()) { + currRow++; + return true; + } + + try { + res = this.it.next(); + } catch (NoSuchElementException e) { return false; } + currRow = 0; - currRow++; - return true; } @@ -76,7 +86,7 @@ public synchronized int getColumnsCount() throws SQLException { if (closed) { throw new SQLException("already closed"); } - + return res.getColumnsCount(); } diff --git a/src/main/proto/schema.proto b/src/main/proto/schema.proto index 26d7828..9c678d1 100644 --- a/src/main/proto/schema.proto +++ b/src/main/proto/schema.proto @@ -685,6 +685,7 @@ message SQLQueryRequest { string sql = 1; repeated NamedParam params = 2; bool reuseSnapshot = 3; + bool acceptStream = 4; } message NamedParam { @@ -783,7 +784,7 @@ service ImmuService { rpc Rollback (google.protobuf.Empty) returns (google.protobuf.Empty){}; rpc TxSQLExec(SQLExecRequest) returns (google.protobuf.Empty) {}; - rpc TxSQLQuery(SQLQueryRequest) returns (SQLQueryResult) {}; + rpc TxSQLQuery(SQLQueryRequest) returns (stream SQLQueryResult) {}; rpc Set (SetRequest) returns (TxHeader){ }; @@ -894,7 +895,7 @@ service ImmuService { rpc SQLExec(SQLExecRequest) returns (SQLExecResult) { }; - rpc SQLQuery(SQLQueryRequest) returns (SQLQueryResult) { + rpc SQLQuery(SQLQueryRequest) returns (stream SQLQueryResult) { }; rpc ListTables(google.protobuf.Empty) returns (SQLQueryResult) { diff --git a/src/test/java/io/codenotary/immudb4j/SQLTransactionsTest.java b/src/test/java/io/codenotary/immudb4j/SQLTransactionsTest.java index 3381481..c008310 100644 --- a/src/test/java/io/codenotary/immudb4j/SQLTransactionsTest.java +++ b/src/test/java/io/codenotary/immudb4j/SQLTransactionsTest.java @@ -42,6 +42,9 @@ public void t1() throws VerificationException, InterruptedException, SQLExceptio new SQLValue(String.format("title%d", i)), new SQLValue(i % 2 == 0)); } + immuClient.commitTransaction(); + + immuClient.beginTransaction(); SQLQueryResult res = immuClient.sqlQuery("SELECT id, title, active FROM mytable"); @@ -71,8 +74,6 @@ public void t1() throws VerificationException, InterruptedException, SQLExceptio res.close(); immuClient.commitTransaction(); - - immuClient.closeSession(); } }