Skip to content

Commit

Permalink
Support stream in SQLQuery/TxSQLQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
ostafen committed May 20, 2024
1 parent 032334c commit f358ae4
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 11 deletions.
3 changes: 2 additions & 1 deletion src/main/java/io/codenotary/immudb4j/ImmuClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ public synchronized SQLQueryResult sqlQuery(String stmt, Map<String, SQLValue> p
.addAllParams(sqlEncodeParams(params))
.build();

return new SQLQueryResult(blockingStub.txSQLQuery(req));
Iterator<io.codenotary.immudb.ImmudbProto.SQLQueryResult> it = blockingStub.txSQLQuery(req);
return new SQLQueryResult(it);
}

private Map<String, SQLValue> sqlNameParams(SQLValue... params) {
Expand Down
26 changes: 18 additions & 8 deletions src/main/java/io/codenotary/immudb4j/sql/SQLQueryResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,27 @@
package io.codenotary.immudb4j.sql;

import java.util.Date;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.Iterator;

import io.codenotary.immudb.ImmudbProto;

public class SQLQueryResult {


private final Iterator<ImmudbProto.SQLQueryResult> it;
private ImmudbProto.SQLQueryResult res;
private int currRow = -1;

private boolean closed;

public SQLQueryResult(ImmudbProto.SQLQueryResult res) {
if (res == null) {
public SQLQueryResult(Iterator<ImmudbProto.SQLQueryResult> it) {
if (it == null) {
throw new RuntimeException("illegal arguments");
}

this.res = res;
this.it = it;
this.res = it.next();
}

public synchronized void close() throws SQLException {
Expand All @@ -45,12 +49,18 @@ public synchronized boolean next() throws SQLException {
throw new SQLException("already closed");
}

if (currRow + 1 >= res.getRowsCount()) {
if (res != null && currRow+1 < res.getRowsCount()) {
currRow++;
return true;
}

try {
res = this.it.next();
} catch (NoSuchElementException e) {
return false;
}
currRow = 0;

currRow++;

return true;
}

Expand All @@ -76,7 +86,7 @@ public synchronized int getColumnsCount() throws SQLException {
if (closed) {
throw new SQLException("already closed");
}

return res.getColumnsCount();
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/proto/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ service ImmuService {
rpc Rollback (google.protobuf.Empty) returns (google.protobuf.Empty){};

rpc TxSQLExec(SQLExecRequest) returns (google.protobuf.Empty) {};
rpc TxSQLQuery(SQLQueryRequest) returns (SQLQueryResult) {};
rpc TxSQLQuery(SQLQueryRequest) returns (stream SQLQueryResult) {};

rpc Set (SetRequest) returns (TxHeader){
};
Expand Down Expand Up @@ -894,7 +894,7 @@ service ImmuService {
rpc SQLExec(SQLExecRequest) returns (SQLExecResult) {
};

rpc SQLQuery(SQLQueryRequest) returns (SQLQueryResult) {
rpc SQLQuery(SQLQueryRequest) returns (stream SQLQueryResult) {
};

rpc ListTables(google.protobuf.Empty) returns (SQLQueryResult) {
Expand Down

0 comments on commit f358ae4

Please sign in to comment.