Skip to content

Commit

Permalink
Fix binary stream
Browse files Browse the repository at this point in the history
  • Loading branch information
TobiasHafner committed Aug 6, 2024
1 parent e14bde0 commit 3bb30f7
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public List<Long> execute( String namespaceName, String languageName, List<Strin
StatementBatchResponse response = callback.takeNext();
if ( statementId == NO_STATEMENT_ID ) {
statementId = response.getBatchId();
streamingIndex.update( statementId );
}
if ( response.getScalarsCount() == 0 ) {
continue;
Expand All @@ -138,12 +139,14 @@ public void prepare( String namespaceName, String languageName, String statement
if ( statement.contains( "?" ) ) {
PreparedStatementSignature signature = getPrismInterfaceClient().prepareIndexedStatement( namespaceName, languageName, statement, timeout );
statementId = signature.getStatementId();
streamingIndex.update( statementId );
isPrepared = true;
return;
}
if ( statement.contains( ":" ) ) {
org.polypheny.prism.PreparedStatementSignature signature = connection.getPrismInterfaceClient().prepareNamedStatement( namespaceName, languageName, statement, timeout );
statementId = signature.getStatementId();
streamingIndex.update( statementId );
isPrepared = true;
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public boolean hasNext() {
@Override
public PolyRow next() {
if ( !hasNext() ) {
throw new NoSuchElementException( "There are no more documents" );
throw new NoSuchElementException( "There are no more rows" );
}
return rows.get( ++index );
}
Expand Down
32 changes: 16 additions & 16 deletions src/main/java/org/polypheny/jdbc/streaming/PrismInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class PrismInputStream extends InputStream {

private static final long NO_MARK = -1;
private static final long NO_LIMIT = -1;
private static final int BUFFER_SIZE = 100001;
private static final int BUFFER_SIZE = 10000;

private final PolyConnection connection;
private final int statementId;
Expand All @@ -54,18 +54,19 @@ public PrismInputStream( int statementId, long streamId, boolean isForwardOnly,
this.statementId = statementId;
this.streamId = streamId;
this.isForwardOnly = isForwardOnly;
this.buffer = new byte[BUFFER_SIZE];
this.buffer = new byte[0];
this.bufferStartPosition = 0;
this.limit = NO_LIMIT;
}

public PrismInputStream(PrismInputStream other, long limit, long startPosition) {

public PrismInputStream( PrismInputStream other, long limit, long startPosition ) {
this.currentPosition = startPosition;
this.connection = other.connection;
this.statementId = other.statementId;
this.streamId = other.streamId;
this.isForwardOnly = other.isForwardOnly;
this.buffer = new byte[BUFFER_SIZE];
this.buffer = other.buffer.clone();
this.bufferStartPosition = other.bufferStartPosition;
this.limit = limit;
}
Expand Down Expand Up @@ -125,9 +126,8 @@ private int getBufferReadPosition() {

@Override
public int read() throws IOException {
try {
fetchIfEmpty();
} catch ( IOException e ) {
fetchIfEmpty();
if ( !hasMoreData() ) {
return -1;
}
int bufferReadPosition = getBufferReadPosition();
Expand All @@ -138,9 +138,6 @@ public int read() throws IOException {

private void fetchIfEmpty() throws IOException {
if ( available() <= 0 ) {
if ( isLast ) {
throw new IOException( "No more data." );
}
fetchNextBytes();
}
}
Expand All @@ -162,19 +159,18 @@ private void fetchNextBytes() throws IOException {
}
this.isLast = frame.getIsLast();
this.bufferStartPosition = fetchPosition;
if (frame.getDataCase() != DataCase.BINARY) {
throw new RuntimeException("Stream type must be binary.");
if ( frame.getDataCase() != DataCase.BINARY ) {
throw new RuntimeException( "Stream type must be binary." );
}
this.buffer = frame.getBinary().toByteArray();
}


@Override
public int read( byte[] b ) throws IOException {
try {
fetchIfEmpty();
} catch ( IOException e ) {
return -1;
fetchIfEmpty();
if ( !hasMoreData() ) {
return 1;
}
int bytesCopied = 0;
while ( bytesCopied < b.length && hasMoreData() ) {
Expand Down Expand Up @@ -230,6 +226,10 @@ public byte[] getBytes( long pos, int len ) throws IOException {

@Override
public int read( byte[] b, int off, int len ) throws IOException {
fetchIfEmpty();
if (!hasMoreData() ) {
return -1;
}
int bytesCopied = 0;
while ( bytesCopied < len && hasMoreData() ) {
int bufferReadPosition = getBufferReadPosition();
Expand Down
1 change: 0 additions & 1 deletion src/main/java/org/polypheny/jdbc/types/PolyBlob.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public PolyBlob( ProtoFile protoFile, PolyConnection connection ) {
binaryValue = protoFile.getBinary().toByteArray();
break;
case STREAM_ID:
//TODO: actually set statement id
prismInputStream = new PrismInputStream(protoFile.getStatementId(), protoFile.getStreamId(), protoFile.getIsForwardOnly(), connection );
}
}
Expand Down
33 changes: 26 additions & 7 deletions src/test/java/org/polypheny/jdbc/types/StreamingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@

import static org.junit.jupiter.api.Assertions.fail;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
Expand All @@ -37,11 +43,11 @@ public class StreamingTest {
private static final String DROP_IF_STATEMENT = "DROP TABLE IF EXISTS file_table";
private static final String CREATE_STATEMENT = "CREATE TABLE file_table (id INT PRIMARY KEY, data FILE)";
private static final String INSERT_STATEMENT = "INSERT INTO file_table (id, data) VALUES (?, ?)";
private static final String QUERY = "SELECT file FROM file_table WHERE id = 0";
private static final String QUERY = "SELECT data FROM file_table WHERE id = 1";


@Test
public void simpleRelationalTest() throws InterruptedException {
public void simpleFileStreamingTest() throws InterruptedException {
try ( Connection connection = TestHelper.getConnection() ) {
if ( !connection.isWrapperFor( PolyConnection.class ) ) {
fail( "Driver must support unwrapping to PolyphenyConnection" );
Expand All @@ -53,23 +59,36 @@ public void simpleRelationalTest() throws InterruptedException {

polyStatement.prepare( "public", "sql", INSERT_STATEMENT );
byte[] expected = createTestData();

List<TypedValue> parameters = new ArrayList<>( 2 );
parameters.add( TypedValue.fromInteger( 1 ) );
parameters.add( TypedValue.fromBytes( expected ) );
Result result = polyStatement.executePrepared( parameters );
result.unwrap( ScalarResult.class );

//RelationalResult result = polyStatement.execute( "public", "sql", QUERY ).unwrap( RelationalResult.class );
//byte[] received = result.iterator().next().get( "data" ).asBytes();
RelationalResult result2 = polyStatement.execute( "public", "sql", QUERY ).unwrap( RelationalResult.class );
InputStream bis = result2.iterator().next().get( "data" ).asBlob().getBinaryStream();
byte[] received = collectStream( bis );

//Assertions.assertEquals( expected, received );
Assertions.assertTrue(true);
} catch ( SQLException e ) {
Assertions.assertArrayEquals( expected, received );
} catch ( SQLException | IOException e ) {
throw new RuntimeException( e );
}
}


private byte[] collectStream( InputStream inputStream ) throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int frameLength;
byte[] frame = new byte[300 * 1024 * 1024];
while ( (frameLength = inputStream.read( frame, 0, frame.length )) != -1 ) {
buffer.write( frame, 0, frameLength );
}
buffer.flush();
return buffer.toByteArray();
}


private static byte[] createTestData() {
final int TOTAL_BYTES = 300 * 1024 * 1024;
final int SIZE = 4;
Expand Down

0 comments on commit 3bb30f7

Please sign in to comment.