Skip to content

Commit

Permalink
Merge pull request #63 from codenotary/update-streaming-rpc
Browse files Browse the repository at this point in the history
Support stream in SQLQuery/TxSQLQuery
  • Loading branch information
ostafen authored Aug 10, 2024
2 parents 3bd9585 + 003bfe0 commit d7c8357
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
with:
java-version: 1.8
- name: Start immudb container
run: docker run -d --health-cmd "immuadmin status" --health-interval 10s --health-timeout 5s --health-retries 5 -p 3322:3322 codenotary/immudb:1.4.0
run: docker run -d --health-cmd "immuadmin status" --health-interval 10s --health-timeout 5s --health-retries 5 -p 3322:3322 codenotary/immudb:1.9.4
- name: Grant execute permission for gradlew
run: chmod +x gradlew
- name: Build with Gradle
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/codenotary/immudb4j/ImmuClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,11 @@ public synchronized SQLQueryResult sqlQuery(String stmt, Map<String, SQLValue> p
final ImmudbProto.SQLQueryRequest req = ImmudbProto.SQLQueryRequest.newBuilder()
.setSql(stmt)
.addAllParams(sqlEncodeParams(params))
.setAcceptStream(true)
.build();

return new SQLQueryResult(blockingStub.txSQLQuery(req));
Iterator<io.codenotary.immudb.ImmudbProto.SQLQueryResult> it = blockingStub.txSQLQuery(req);
return new SQLQueryResult(it);
}

private Map<String, SQLValue> sqlNameParams(SQLValue... params) {
Expand Down
26 changes: 18 additions & 8 deletions src/main/java/io/codenotary/immudb4j/sql/SQLQueryResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,27 @@
package io.codenotary.immudb4j.sql;

import java.util.Date;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.Iterator;

import io.codenotary.immudb.ImmudbProto;

public class SQLQueryResult {


private final Iterator<ImmudbProto.SQLQueryResult> it;
private ImmudbProto.SQLQueryResult res;
private int currRow = -1;

private boolean closed;

public SQLQueryResult(ImmudbProto.SQLQueryResult res) {
if (res == null) {
public SQLQueryResult(Iterator<ImmudbProto.SQLQueryResult> it) {
if (it == null) {
throw new RuntimeException("illegal arguments");
}

this.res = res;
this.it = it;
this.res = it.next();
}

public synchronized void close() throws SQLException {
Expand All @@ -45,12 +49,18 @@ public synchronized boolean next() throws SQLException {
throw new SQLException("already closed");
}

if (currRow + 1 >= res.getRowsCount()) {
if (res != null && currRow+1 < res.getRowsCount()) {
currRow++;
return true;
}

try {
res = this.it.next();
} catch (NoSuchElementException e) {
return false;
}
currRow = 0;

currRow++;

return true;
}

Expand All @@ -76,7 +86,7 @@ public synchronized int getColumnsCount() throws SQLException {
if (closed) {
throw new SQLException("already closed");
}

return res.getColumnsCount();
}

Expand Down
5 changes: 3 additions & 2 deletions src/main/proto/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,7 @@ message SQLQueryRequest {
string sql = 1;
repeated NamedParam params = 2;
bool reuseSnapshot = 3;
bool acceptStream = 4;
}

message NamedParam {
Expand Down Expand Up @@ -783,7 +784,7 @@ service ImmuService {
rpc Rollback (google.protobuf.Empty) returns (google.protobuf.Empty){};

rpc TxSQLExec(SQLExecRequest) returns (google.protobuf.Empty) {};
rpc TxSQLQuery(SQLQueryRequest) returns (SQLQueryResult) {};
rpc TxSQLQuery(SQLQueryRequest) returns (stream SQLQueryResult) {};

rpc Set (SetRequest) returns (TxHeader){
};
Expand Down Expand Up @@ -894,7 +895,7 @@ service ImmuService {
rpc SQLExec(SQLExecRequest) returns (SQLExecResult) {
};

rpc SQLQuery(SQLQueryRequest) returns (SQLQueryResult) {
rpc SQLQuery(SQLQueryRequest) returns (stream SQLQueryResult) {
};

rpc ListTables(google.protobuf.Empty) returns (SQLQueryResult) {
Expand Down
5 changes: 3 additions & 2 deletions src/test/java/io/codenotary/immudb4j/SQLTransactionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public void t1() throws VerificationException, InterruptedException, SQLExceptio
new SQLValue(String.format("title%d", i)),
new SQLValue(i % 2 == 0));
}
immuClient.commitTransaction();

immuClient.beginTransaction();

SQLQueryResult res = immuClient.sqlQuery("SELECT id, title, active FROM mytable");

Expand Down Expand Up @@ -71,8 +74,6 @@ public void t1() throws VerificationException, InterruptedException, SQLExceptio
res.close();

immuClient.commitTransaction();

immuClient.closeSession();
}

}

0 comments on commit d7c8357

Please sign in to comment.