Skip to content

Commit

Permalink
Implement blob streaming correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
TobiasHafner committed Aug 2, 2024
1 parent 0d2151a commit 8bbbfa5
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 24 deletions.
34 changes: 32 additions & 2 deletions src/main/java/org/polypheny/jdbc/PrismInterfaceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.polypheny.jdbc;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -71,8 +72,10 @@
import org.polypheny.prism.StatementBatchResponse;
import org.polypheny.prism.StatementResponse;
import org.polypheny.prism.StatementResult;
import org.polypheny.prism.StreamAcknowledgement;
import org.polypheny.prism.StreamFetchRequest;
import org.polypheny.prism.StreamFrame;
import org.polypheny.prism.StreamSendRequest;
import org.polypheny.prism.TableType;
import org.polypheny.prism.TableTypesRequest;
import org.polypheny.prism.Type;
Expand Down Expand Up @@ -290,14 +293,41 @@ public Frame fetchResult( int statementId, int fetchSize, int timeout ) throws P
return rpc.fetchResult( fetchRequest, timeout );
}

public StreamFrame fetchStream(int statementId, long streamId, long position, int length, int timeout ) throws PrismInterfaceServiceException {

public StreamFrame fetchStream( int statementId, long streamId, long position, int length, int timeout ) throws PrismInterfaceServiceException {
StreamFetchRequest streamFetchRequest = StreamFetchRequest.newBuilder()
.setStatementId( statementId )
.setStreamId( streamId )
.setPosition( position )
.setLength( length )
.build();
return rpc.fetchStream(streamFetchRequest, timeout);
return rpc.fetchStream( streamFetchRequest, timeout );
}


public StreamAcknowledgement streamBinary( byte[] bytes, boolean is_last, int timeout ) throws PrismInterfaceServiceException {
StreamFrame frame = StreamFrame.newBuilder()
.setBinary( ByteString.copyFrom( bytes ) )
.setIsLast( is_last )
.build();
StreamSendRequest streamSendRequest = StreamSendRequest.newBuilder()
.setFrame( frame )
.build();
return rpc.stream( streamSendRequest, timeout );

}


public StreamAcknowledgement streamBinary( byte[] bytes, boolean is_last, int timeout, long streamId ) throws PrismInterfaceServiceException {
StreamFrame frame = StreamFrame.newBuilder()
.setBinary( ByteString.copyFrom( bytes ) )
.setIsLast( is_last )
.build();
StreamSendRequest streamSendRequest = StreamSendRequest.newBuilder()
.setFrame( frame )
.setStreamId( streamId )
.build();
return rpc.stream( streamSendRequest, timeout );
}


Expand Down
10 changes: 10 additions & 0 deletions src/main/java/org/polypheny/jdbc/RpcService.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@
import org.polypheny.prism.StatementBatchResponse;
import org.polypheny.prism.StatementResponse;
import org.polypheny.prism.StatementResult;
import org.polypheny.prism.StreamAcknowledgement;
import org.polypheny.prism.StreamFetchRequest;
import org.polypheny.prism.StreamFrame;
import org.polypheny.prism.StreamSendRequest;
import org.polypheny.prism.TableTypesRequest;
import org.polypheny.prism.TableTypesResponse;
import org.polypheny.prism.TypesRequest;
Expand Down Expand Up @@ -474,4 +476,12 @@ CloseResultResponse closeResult( CloseResultRequest msg, int timeout ) throws Pr
req.setCloseResultRequest( msg );
return completeSynchronously( req, timeout ).getCloseResultResponse();
}


public StreamAcknowledgement stream( StreamSendRequest msg, int timeout ) throws PrismInterfaceServiceException {
Request.Builder req = newMessage();
req.setStreamSendRequest( msg );
return completeSynchronously( req, timeout ).getStreamAcknowledgement();
}

}
8 changes: 6 additions & 2 deletions src/main/java/org/polypheny/jdbc/types/PrismInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
import org.polypheny.jdbc.PolyConnection;
import org.polypheny.jdbc.PrismInterfaceServiceException;
import org.polypheny.prism.StreamFrame;
import org.polypheny.prism.StreamFrame.DataCase;

public class PrismInputStream extends InputStream {

private static final long NO_MARK = -1;
private static final long NO_LIMIT = -1;
private static final int BUFFER_SIZE = 1000;
private static final int BUFFER_SIZE = 100001;

private final PolyConnection connection;
private final int statementId;
Expand Down Expand Up @@ -162,7 +163,10 @@ private void fetchNextBytes() throws IOException {
}
this.isLast = frame.getIsLast();
this.bufferStartPosition = fetchPosition;
this.buffer = frame.getData().toByteArray();
if (frame.getDataCase() != DataCase.BINARY) {
throw new RuntimeException("Stream type must be binary.");
}
this.buffer = frame.getBinary().toByteArray();
}


Expand Down
131 changes: 111 additions & 20 deletions src/main/java/org/polypheny/jdbc/types/TypedValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.sql.Array;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.Date;
import java.sql.NClob;
import java.sql.Ref;
Expand All @@ -57,6 +56,7 @@
import lombok.Getter;
import lombok.Setter;
import org.polypheny.jdbc.PolyConnection;
import org.polypheny.jdbc.PrismInterfaceClient;
import org.polypheny.jdbc.PrismInterfaceErrors;
import org.polypheny.jdbc.PrismInterfaceServiceException;
import org.polypheny.jdbc.properties.DriverProperties;
Expand All @@ -83,6 +83,9 @@ public class TypedValue implements Convertible {

private static final long MILLISECONDS_PER_DAY = 24 * 60 * 60 * 1000;

private static final int STREAMING_THRESHOLD = 100000000;
private static final int STREAMING_TIMEOUT = 100000;

private static final Set<ValueCase> customTypes = new HashSet<>( Arrays.asList(
ValueCase.DOCUMENT,
ValueCase.INTERVAL
Expand Down Expand Up @@ -1238,7 +1241,7 @@ private void deserialize() {
}


public ProtoValue serialize() throws SQLException {
public ProtoValue serialize( PrismInterfaceClient client ) throws SQLException {
switch ( valueCase ) {
case BOOLEAN:
return serializeAsProtoBoolean();
Expand All @@ -1263,31 +1266,47 @@ public ProtoValue serialize() throws SQLException {
case STRING:
return serializeAsProtoString();
case BINARY:
return serializeAsProtoBinary();
return serializeAsProtoBinary( client );
case NULL:
return serializeAsProtoNull();
case LIST:
return serializeAsProtoList();
return serializeAsProtoList( client );
case FILE:
return serializeAsProtoFile();
return serializeAsProtoFile( client );
case DOCUMENT:
return serializeAsProtoDocument();
}
throw new PrismInterfaceServiceException( PrismInterfaceErrors.DATA_TYPE_MISMATCH, "Failed to serialize unknown type: " + valueCase.name() );
}


private ProtoValue serializeAsProtoFile() throws SQLException {
private ProtoValue serializeAsProtoFile( PrismInterfaceClient client ) throws SQLException {
ProtoFile protoFile;
if ( blobValue.length() < STREAMING_THRESHOLD ) {
try {
protoFile = ProtoFile.newBuilder()
.setBinary( ByteString.copyFrom( collectByteStream( blobValue.getBinaryStream() ) ) )
.build();
return ProtoValue.newBuilder()
.setFile( protoFile )
.build();
} catch ( IOException e ) {
throw new RuntimeException( e );
}
}
long streamId;
try {
ProtoFile protoFile = ProtoFile.newBuilder()
.setBinary( ByteString.copyFrom( collectByteStream( blobValue.getBinaryStream() ) ) )
.build();
return ProtoValue.newBuilder()
.setFile( protoFile )
.build();
} catch ( IOException e ) {
throw new PrismInterfaceServiceException( PrismInterfaceErrors.STREAM_ERROR, "Failed to read bytes from blob." );
streamId = streamBinary( blobValue, client );
} catch ( PrismInterfaceServiceException e ) {
throw new RuntimeException( e );
}
protoFile = ProtoFile.newBuilder()
.setStreamId( streamId )
.setIsForwardOnly( true )
.build();
return ProtoValue.newBuilder()
.setFile( protoFile )
.build();
}


Expand All @@ -1310,10 +1329,10 @@ private ProtoValue serializeAsInterval() {
}


private ProtoValue serializeAsProtoList() throws SQLException {
private ProtoValue serializeAsProtoList( PrismInterfaceClient client ) throws SQLException {
List<ProtoValue> elements = new ArrayList<>();
for ( Object object : (Object[]) arrayValue.getArray() ) {
elements.add( TypedValue.fromObject( object ).serialize() );
elements.add( TypedValue.fromObject( object ).serialize( client ) );
}
ProtoList protoList = ProtoList.newBuilder()
.addAllValues( elements )
Expand Down Expand Up @@ -1406,16 +1425,87 @@ private ProtoValue serializeAsTimestamp() {
}


private ProtoValue serializeAsProtoBinary() {
ProtoBinary protoBinary = ProtoBinary.newBuilder()
.setBinary( ByteString.copyFrom( binaryValue ) )
private ProtoValue serializeAsProtoBinary( PrismInterfaceClient client ) {
ProtoBinary protoBinary;
if ( binaryValue.length < STREAMING_THRESHOLD ) {
protoBinary = ProtoBinary.newBuilder()
.setBinary( ByteString.copyFrom( binaryValue ) )
.build();
return ProtoValue.newBuilder()
.setBinary( protoBinary )
.build();
}
long streamId;
try {
streamId = streamBinary( binaryValue, client );
} catch ( PrismInterfaceServiceException e ) {
throw new RuntimeException( e );
}
protoBinary = ProtoBinary.newBuilder()
.setStreamId( streamId )
.setIsForwardOnly( true )
.build();
return ProtoValue.newBuilder()
.setBinary( protoBinary )
.build();
}


private long streamBinary( byte[] data, PrismInterfaceClient client ) throws PrismInterfaceServiceException {
int size = data.length;
int offset = 0;
long streamId = -1;

while ( offset < size ) {
int frameSize = Math.min( STREAMING_THRESHOLD, size - offset );
byte[] frameData = new byte[frameSize];
System.arraycopy( data, offset, frameData, 0, frameSize );
boolean isLast = (offset + frameSize) >= size;
if ( streamId == -1 ) {
streamId = client.streamBinary( frameData, isLast, STREAMING_TIMEOUT ).getStreamId();
continue;
}
client.streamBinary( frameData, isLast, STREAMING_TIMEOUT, streamId );
}
return streamId;
}


private long streamBinary( Blob data, PrismInterfaceClient client ) throws PrismInterfaceServiceException {
long size;
long offset = 0;
long streamId = -1;

try ( InputStream inputStream = data.getBinaryStream() ) {
size = data.length();
byte[] buffer = new byte[STREAMING_THRESHOLD];

while ( offset < size ) {
int bytesRead = inputStream.read( buffer, 0, STREAMING_THRESHOLD );
if ( bytesRead == -1 ) {
break; // End of stream reached
}

boolean isLast = (offset + bytesRead) >= size;
byte[] frameData = new byte[bytesRead];
System.arraycopy( buffer, 0, frameData, 0, bytesRead );

if ( streamId == -1 ) {
streamId = client.streamBinary( frameData, isLast, STREAMING_TIMEOUT ).getStreamId();
} else {
client.streamBinary( frameData, isLast, STREAMING_TIMEOUT, streamId );
}

offset += bytesRead;
}
} catch ( SQLException | IOException e ) {
throw new PrismInterfaceServiceException( "Error streaming binary data", e );
}

return streamId;
}


private ProtoValue serializeAsProtoNull() {
return ProtoValue.newBuilder()
.setNull( ProtoNull.newBuilder().build() )
Expand Down Expand Up @@ -1452,7 +1542,7 @@ private static BigDecimal getBigDecimal( ByteString unscaledValue, int scale ) {
private static Array getArray( ProtoValue value, PolyConnection polyConnection ) throws SQLException {
String baseType = value.getValueCase().name();
List<TypedValue> values = value.getList().getValuesList().stream()
.map( v -> new TypedValue(v, polyConnection ))
.map( v -> new TypedValue( v, polyConnection ) )
.collect( Collectors.toList() );
return new PolyArray( baseType, values );
}
Expand All @@ -1461,4 +1551,5 @@ private static Array getArray( ProtoValue value, PolyConnection polyConnection )
private static PolyInterval getInterval( ProtoInterval interval ) {
return new PolyInterval( interval.getMonths(), interval.getMilliseconds() );
}

}

0 comments on commit 8bbbfa5

Please sign in to comment.