Skip to content

Commit

Permalink
Complete implementation of mark and reset functionality on binary str…
Browse files Browse the repository at this point in the history
…eams
  • Loading branch information
TobiasHafner committed Aug 9, 2024
1 parent 2f1eabb commit a012e2f
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 84 deletions.
130 changes: 51 additions & 79 deletions src/main/java/org/polypheny/jdbc/streaming/BinaryPrismInputStream.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
/*
* Copyright 2019-2024 The Polypheny Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.polypheny.jdbc.streaming;

import java.io.IOException;
Expand Down Expand Up @@ -48,8 +32,7 @@ public class BinaryPrismInputStream extends InputStream {
private boolean isLast = false;
private boolean isClosed = false;


public BinaryPrismInputStream( int statementId, long streamId, boolean isForwardOnly, PolyConnection connection ) {
public BinaryPrismInputStream(int statementId, long streamId, boolean isForwardOnly, PolyConnection connection) {
this.connection = connection;
this.statementId = statementId;
this.streamId = streamId;
Expand All @@ -59,8 +42,7 @@ public BinaryPrismInputStream( int statementId, long streamId, boolean isForward
this.limit = NO_LIMIT;
}


public BinaryPrismInputStream( BinaryPrismInputStream other, long limit, long startPosition ) {
public BinaryPrismInputStream(BinaryPrismInputStream other, long limit, long startPosition) {
this.currentPosition = startPosition;
this.connection = other.connection;
this.statementId = other.statementId;
Expand All @@ -71,188 +53,178 @@ public BinaryPrismInputStream( BinaryPrismInputStream other, long limit, long st
this.limit = limit;
}


@Override
public int available() throws IOException {
long available = bufferStartPosition + buffer.length - currentPosition;
if ( limit != NO_LIMIT && limit < bufferStartPosition + buffer.length ) {
if (limit != NO_LIMIT && limit < bufferStartPosition + buffer.length) {
available = limit - currentPosition;
}
if ( available > Integer.MAX_VALUE ) {
if (available > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
return Math.toIntExact( available );
return Math.toIntExact(available);
}


@Override
public void close() throws IOException {
// No need to communicate with the server as all streams are closed on statement closure.
isClosed = true;
}


@Override
public void mark( int readlimit ) {
public void mark(int readlimit) {
if (isForwardOnly) {
return; // Forward-only streams do not support mark
}
markPosition = currentPosition;
markReadLimit = readlimit;
}


@Override
public void reset() throws IOException {
if ( isForwardOnly ) {
throw new IOException( "This stream does not support mark and reset." );
if (isForwardOnly) {
throw new IOException("This stream does not support mark and reset.");
}
if ( markPosition == NO_MARK ) {
throw new IOException( "No mark set. Nothing to reset." );
if (markPosition == NO_MARK) {
throw new IOException("No mark set. Nothing to reset.");
}
if ( currentPosition - markPosition > markReadLimit ) {
throw new IOException( "Current position exceeds read limit set on mark. Nothing to reset." );
if (currentPosition - markPosition > markReadLimit) {
throw new IOException("Current position exceeds read limit set on mark. Nothing to reset.");
}
currentPosition = markPosition;
bufferStartPosition = markPosition;
fetchNextBytes(); // Refill the buffer from the marked position
}


@Override
public boolean markSupported() {
return !isForwardOnly;
}


private int getBufferReadPosition() {
return Math.toIntExact( currentPosition - bufferStartPosition );
return Math.toIntExact(currentPosition - bufferStartPosition);
}


@Override
public int read() throws IOException {
fetchIfEmpty();
if ( !hasMoreData() ) {
if (!hasMoreData()) {
return -1;
}
int bufferReadPosition = getBufferReadPosition();
currentPosition++;
return buffer[bufferReadPosition] & 0xFF;
}


private void fetchIfEmpty() throws IOException {
if ( available() <= 0 ) {
if (available() <= 0) {
fetchNextBytes();
}
}


private boolean hasMoreData() throws IOException {
return !isLast || available() > 0;
}


private void fetchNextBytes() throws IOException {
long fetchPosition = bufferStartPosition + buffer.length;
int timeout = connection.getTimeout();
StreamFrame frame;
try {
frame = connection.getPrismInterfaceClient().fetchStream( this.statementId, this.streamId, fetchPosition, BUFFER_SIZE, timeout );
} catch ( PrismInterfaceServiceException e ) {
throw new IOException( e );
frame = connection.getPrismInterfaceClient().fetchStream(this.statementId, this.streamId, fetchPosition, BUFFER_SIZE, timeout);
} catch (PrismInterfaceServiceException e) {
throw new IOException(e);
}
this.isLast = frame.getIsLast();
this.bufferStartPosition = fetchPosition;
if ( frame.getDataCase() != DataCase.BINARY ) {
throw new RuntimeException( "Stream type must be binary." );
if (frame.getDataCase() != DataCase.BINARY) {
throw new RuntimeException("Stream type must be binary.");
}
this.buffer = frame.getBinary().toByteArray();
}


@Override
public int read( byte[] b ) throws IOException {
public int read(byte[] b) throws IOException {
fetchIfEmpty();
if ( !hasMoreData() ) {
if (!hasMoreData()) {
return 1;
}
int bytesCopied = 0;
while ( bytesCopied < b.length && hasMoreData() ) {
while (bytesCopied < b.length && hasMoreData()) {
int bufferReadPosition = getBufferReadPosition();
int bytesToCopy = Math.min( available(), b.length - bytesCopied );
System.arraycopy( this.buffer, bufferReadPosition, b, bytesCopied, bytesToCopy );
int bytesToCopy = Math.min(available(), b.length - bytesCopied);
System.arraycopy(this.buffer, bufferReadPosition, b, bytesCopied, bytesToCopy);
currentPosition += bytesToCopy;
bytesCopied += bytesToCopy;
fetchIfEmpty();
}
return bytesCopied;
}


private void reposition( long pos, int len ) throws IOException {
if ( pos < bufferStartPosition && isForwardOnly ) {
throw new IOException( "Can't access already returned section of a forward only stream." );
private void reposition(long pos, int len) throws IOException {
if (pos < bufferStartPosition && isForwardOnly) {
throw new IOException("Can't access already returned section of a forward-only stream.");
}

if ( pos < currentPosition ) {
if ( pos + len < markPosition ) {
if (pos < currentPosition) {
if (pos + len < markPosition) {
markPosition = NO_MARK;
}
currentPosition = pos;
if ( !isForwardOnly ) {
if (!isForwardOnly) {
bufferStartPosition = pos;
isLast = false;
fetchNextBytes();
}
return;
}
skip( pos - currentPosition );
skip(pos - currentPosition);
}


public byte[] getBytes( long pos, int len ) throws IOException {
public byte[] getBytes(long pos, int len) throws IOException {
byte[] bytes = new byte[len];
int bytesCopied = 0;

reposition( pos, len );
reposition(pos, len);

while ( bytesCopied < len && hasMoreData() ) {
while (bytesCopied < len && hasMoreData()) {
int bufferReadPosition = getBufferReadPosition();
int bytesToCopy = Math.min( available(), len - bytesCopied );
System.arraycopy( this.buffer, bufferReadPosition, bytes, bytesCopied, bytesToCopy );
int bytesToCopy = Math.min(available(), len - bytesCopied);
System.arraycopy(this.buffer, bufferReadPosition, bytes, bytesCopied, bytesToCopy);
currentPosition += bytesToCopy;
bytesCopied += bytesToCopy;
fetchIfEmpty();
}
return bytes;
}


@Override
public int read( byte[] b, int off, int len ) throws IOException {
public int read(byte[] b, int off, int len) throws IOException {
fetchIfEmpty();
if (!hasMoreData() ) {
if (!hasMoreData()) {
return -1;
}
int bytesCopied = 0;
while ( bytesCopied < len && hasMoreData() ) {
while (bytesCopied < len && hasMoreData()) {
int bufferReadPosition = getBufferReadPosition();
int bytesToCopy = Math.min( available(), len - bytesCopied );
System.arraycopy( this.buffer, bufferReadPosition, b, off + bytesCopied, bytesToCopy );
int bytesToCopy = Math.min(available(), len - bytesCopied);
System.arraycopy(this.buffer, bufferReadPosition, b, off + bytesCopied, bytesToCopy);
currentPosition += bytesToCopy;
bytesCopied += bytesToCopy;
fetchIfEmpty();
}
return bytesCopied;
}


@Override
public long skip( long n ) throws IOException {
public long skip(long n) throws IOException {
long bytesSkipped = 0;
while ( bytesSkipped < n && hasMoreData() ) {
long skipped = Math.min( n - bytesSkipped, available() );
while (bytesSkipped < n && hasMoreData()) {
long skipped = Math.min(n - bytesSkipped, available());
bytesSkipped += skipped;
currentPosition += skipped;
fetchIfEmpty();
}
return bytesSkipped;
}

}
5 changes: 0 additions & 5 deletions src/main/java/org/polypheny/jdbc/types/TypedValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -1424,11 +1424,6 @@ private ProtoValue serializeAsProtoDate() {


private ProtoValue serializeAsProtoString(StreamingIndex streamingIndex) {
//return ProtoUtils.serializeAsProtoString( varcharValue );




ProtoString protoString;
if ( varcharValue.length() * 2 < STREAMING_THRESHOLD ) {
protoString = ProtoString.newBuilder()
Expand Down

0 comments on commit a012e2f

Please sign in to comment.