Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-44065: [Java] Implement C Data Interface for RunEndEncodedVector #44241

Merged
merged 12 commits into from
Oct 17, 2024
1 change: 0 additions & 1 deletion dev/archery/archery/integration/datagen.py
Original file line number Diff line number Diff line change
Expand Up @@ -1929,7 +1929,6 @@ def _temp_path():

generate_run_end_encoded_case()
.skip_tester('C#')
.skip_tester('Java')
.skip_tester('JS')
# TODO(https://github.com/apache/arrow-nanoarrow/issues/618)
.skip_tester('nanoarrow')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public List<ArrowBuf> visit(ArrowType.Union type) {

@Override
public List<ArrowBuf> visit(ArrowType.RunEndEncoded type) {
throw new UnsupportedOperationException("Importing buffers for type: " + type);
return List.of();
}

@Override
Expand Down
4 changes: 4 additions & 0 deletions java/c/src/main/java/org/apache/arrow/c/Format.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ static String asString(ArrowType arrowType) {
return "+vl";
case LargeListView:
return "+vL";
case RunEndEncoded:
return "+r";
case NONE:
throw new IllegalArgumentException("Arrow type ID is NONE");
default:
Expand Down Expand Up @@ -321,6 +323,8 @@ static ArrowType asType(String format, long flags)
return new ArrowType.ListView();
case "+vL":
return new ArrowType.LargeListView();
case "+r":
return new ArrowType.RunEndEncoded();
default:
String[] parts = format.split(":", 2);
if (parts.length == 2) {
Expand Down
17 changes: 17 additions & 0 deletions java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.ListViewVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.RunEndEncodedVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.complex.UnionVector;
import org.apache.arrow.vector.complex.impl.UnionMapWriter;
Expand Down Expand Up @@ -770,6 +771,22 @@ public void testStructVector() {
}
}

@Test
public void testRunEndEncodedVector() {
ViggoC marked this conversation as resolved.
Show resolved Hide resolved
try (final RunEndEncodedVector vector = RunEndEncodedVector.empty("v", allocator)) {
setVector(vector, List.of(1, 3), List.of(1, 2));
assertTrue(roundtrip(vector, RunEndEncodedVector.class));
}
}

@Test
public void testEmptyRunEndEncodedVector() {
try (final RunEndEncodedVector vector = RunEndEncodedVector.empty("v", allocator)) {
setVector(vector, List.of(), List.of());
assertTrue(roundtrip(vector, RunEndEncodedVector.class));
}
}

@Test
public void testExtensionTypeVector() {
ExtensionTypeRegistry.register(new UuidType());
Expand Down
14 changes: 14 additions & 0 deletions java/c/src/test/python/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,20 @@ def recreate_batch():
return reader.read_next_batch()

self.round_trip_record_batch(recreate_batch)

def test_runendencoded_array(self):
# empty vector
self.round_trip_array(lambda: pa.RunEndEncodedArray.from_arrays([], [], pa.run_end_encoded(pa.int64(), pa.int64())))

# constant null vector
self.round_trip_array(lambda: pa.RunEndEncodedArray.from_arrays([10], [None]))
# constant int vector
self.round_trip_array(lambda: pa.RunEndEncodedArray.from_arrays([10], [10]))

# run end int vector
self.round_trip_array(lambda: pa.RunEndEncodedArray.from_arrays([3, 5, 10, 12, 19], [1, 2, 1, None, 3]))
# run end string vector
self.round_trip_array(lambda: pa.RunEndEncodedArray.from_arrays([3, 5, 10, 12, 19], ["1", "2", "1", None, "3"]))

if __name__ == '__main__':
unittest.main(verbosity=2)
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
import org.apache.arrow.memory.util.hash.ArrowBufHasher;
import org.apache.arrow.vector.BaseIntVector;
import org.apache.arrow.vector.BaseValueVector;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BufferBacked;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ZeroVector;
import org.apache.arrow.vector.compare.VectorVisitor;
Expand All @@ -50,6 +53,7 @@
* values vector of any type. There are no buffers associated with the parent vector.
*/
public class RunEndEncodedVector extends BaseValueVector implements FieldVector {

public static final FieldVector DEFAULT_VALUE_VECTOR = ZeroVector.INSTANCE;
public static final FieldVector DEFAULT_RUN_END_VECTOR = ZeroVector.INSTANCE;

Expand Down Expand Up @@ -203,6 +207,7 @@ public void clear() {
for (FieldVector v : getChildrenFromFields()) {
v.clear();
}
this.valueCount = 0;
}

/**
Expand Down Expand Up @@ -234,19 +239,6 @@ public MinorType getMinorType() {
return MinorType.RUNENDENCODED;
}

/**
* To transfer quota responsibility.
*
* @param allocator the target allocator
* @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new
* target vector of the same type.
*/
@Override
public TransferPair getTransferPair(BufferAllocator allocator) {
throw new UnsupportedOperationException(
"RunEndEncodedVector does not support getTransferPair(BufferAllocator)");
}

/**
* To transfer quota responsibility.
*
Expand Down Expand Up @@ -284,8 +276,7 @@ public TransferPair getTransferPair(Field field, BufferAllocator allocator) {
*/
@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
throw new UnsupportedOperationException(
"RunEndEncodedVector does not support getTransferPair(String, BufferAllocator, CallBack)");
return new TransferImpl(ref, allocator, callBack);
}

/**
Expand All @@ -299,8 +290,7 @@ public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallB
*/
@Override
public TransferPair getTransferPair(Field field, BufferAllocator allocator, CallBack callBack) {
throw new UnsupportedOperationException(
"RunEndEncodedVector does not support getTransferPair(Field, BufferAllocator, CallBack)");
return new TransferImpl(field, allocator, callBack);
}

/**
Expand All @@ -312,8 +302,156 @@ public TransferPair getTransferPair(Field field, BufferAllocator allocator, Call
*/
@Override
public TransferPair makeTransferPair(ValueVector target) {
throw new UnsupportedOperationException(
"RunEndEncodedVector does not support makeTransferPair(ValueVector)");
return new TransferImpl((RunEndEncodedVector) target);
}

private class TransferImpl implements TransferPair {

RunEndEncodedVector to;
TransferPair dataTransferPair;
TransferPair reeTransferPair;

public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) {
this(new RunEndEncodedVector(name, allocator, field.getFieldType(), callBack));
}

public TransferImpl(Field field, BufferAllocator allocator, CallBack callBack) {
this(new RunEndEncodedVector(field, allocator, callBack));
}

public TransferImpl(RunEndEncodedVector to) {
this.to = to;
if (to.getRunEndsVector() instanceof ZeroVector) {
to.initializeChildrenFromFields(field.getChildren());
}
reeTransferPair = getRunEndsVector().makeTransferPair(to.getRunEndsVector());
dataTransferPair = getValuesVector().makeTransferPair(to.getValuesVector());
}

/**
* Transfer the vector data to another vector. The memory associated with this vector is
* transferred to the allocator of target vector for accounting and management purposes.
*/
@Override
public void transfer() {
to.clear();
dataTransferPair.transfer();
reeTransferPair.transfer();
if (valueCount > 0) {
to.setValueCount(valueCount);
}
clear();
}

/**
* Slice this vector at the desired index and length, then transfer the corresponding data to
* the target vector.
*
* @param startIndex start position of the split in source vector.
* @param length length of the split.
*/
@Override
public void splitAndTransfer(int startIndex, int length) {
to.clear();
if (length <= 0) {
return;
}

int physicalStartIndex = getPhysicalIndex(startIndex);
int physicalEndIndex = getPhysicalIndex(startIndex + length - 1);
int physicalLength = physicalEndIndex - physicalStartIndex + 1;
dataTransferPair.splitAndTransfer(physicalStartIndex, physicalLength);
FieldVector toRunEndsVector = to.runEndsVector;
if (startIndex == 0) {
if (((BaseIntVector) runEndsVector).getValueAsLong(physicalEndIndex) == length) {
reeTransferPair.splitAndTransfer(physicalStartIndex, physicalLength);
} else {
reeTransferPair.splitAndTransfer(physicalStartIndex, physicalLength - 1);
toRunEndsVector.setValueCount(physicalLength);
if (toRunEndsVector instanceof SmallIntVector) {
((SmallIntVector) toRunEndsVector).set(physicalEndIndex, length);
} else if (toRunEndsVector instanceof IntVector) {
((IntVector) toRunEndsVector).set(physicalEndIndex, length);
} else if (toRunEndsVector instanceof BigIntVector) {
((BigIntVector) toRunEndsVector).set(physicalEndIndex, length);
} else {
throw new IllegalArgumentException(
"Run-end vector and must be of type int with size 16, 32, or 64 bits.");
}
}
} else {
shiftRunEndsVector(
toRunEndsVector,
startIndex,
length,
physicalStartIndex,
physicalEndIndex,
physicalLength);
}
getTo().setValueCount(length);
}

private void shiftRunEndsVector(
ValueVector toRunEndVector,
int startIndex,
int length,
int physicalStartIndex,
int physicalEndIndex,
int physicalLength) {
toRunEndVector.setValueCount(physicalLength);
toRunEndVector.getValidityBuffer().setOne(0, toRunEndVector.getValidityBuffer().capacity());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this. Could you please take a look how other vectors are doing this for validity buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ListVector, it slice the validityBuffer if startIndex % 8 == 0, and copy data one by one when the first bit starts from the middle of a byte.
But for run end encoded vector, the element of run end vector can never be null so I just set validity buffer of RunEndVector to 1.
What's your concern about this code? The memory of validity buffer should be reused when startIndex % 8 == 0, Or we should not set the bit beyond the physical length?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ViggoC not saying it's wrong, I mentioned I wasn't sure.

ArrowBuf fromRunEndBuffer = runEndsVector.getDataBuffer();
ArrowBuf toRunEndBuffer = toRunEndVector.getDataBuffer();
int physicalLastIndex = physicalLength - 1;
if (toRunEndVector instanceof SmallIntVector) {
byte typeWidth = SmallIntVector.TYPE_WIDTH;
for (int i = 0; i < physicalLastIndex; i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the loop go from [0, physicalLength) or [physicalStartIndex, physicalLastIndex)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it through [0, physicalLength - 1), and handle physicalLength - 1 separately

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but this loop is from [0, physicalLastIndex) - aren't we "crossing indices" here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

physicalLastIndex = physicalLength - 1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right. Sorry, the naming is confusing, IMO - physicalLastIndex feels like it should go with physicalStartIndex and not physicalLength

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, the naming is kind of confusing, I renamed them, does it make sense for you now?

toRunEndBuffer.setShort(
(long) i * typeWidth,
fromRunEndBuffer.getShort((long) (i + physicalStartIndex) * typeWidth) - startIndex);
}
int lastEnd =
Math.min(
fromRunEndBuffer.getShort((long) physicalEndIndex * typeWidth) - startIndex,
length);
toRunEndBuffer.setShort((long) physicalLastIndex * typeWidth, lastEnd);
} else if (toRunEndVector instanceof IntVector) {
byte typeWidth = IntVector.TYPE_WIDTH;
for (int i = 0; i < physicalLastIndex; i++) {
toRunEndBuffer.setInt(
(long) i * typeWidth,
fromRunEndBuffer.getInt((long) (i + physicalStartIndex) * typeWidth) - startIndex);
}
int lastEnd =
Math.min(
fromRunEndBuffer.getInt((long) physicalEndIndex * typeWidth) - startIndex, length);
toRunEndBuffer.setInt((long) physicalLastIndex * typeWidth, lastEnd);
} else if (toRunEndVector instanceof BigIntVector) {
byte typeWidth = BigIntVector.TYPE_WIDTH;
for (int i = 0; i < physicalLastIndex; i++) {
toRunEndBuffer.setLong(
(long) i * typeWidth,
fromRunEndBuffer.getLong((long) (i + physicalStartIndex) * typeWidth) - startIndex);
}
long lastEnd =
Math.min(
fromRunEndBuffer.getLong((long) physicalEndIndex * typeWidth) - startIndex, length);
toRunEndBuffer.setLong((long) physicalLastIndex * typeWidth, lastEnd);
} else {
throw new IllegalArgumentException(
"Run-end vector and must be of type int with size 16, 32, or 64 bits.");
}
}

@Override
public ValueVector getTo() {
return to;
}

@Override
public void copyValueSafe(int from, int to) {
this.to.copyFrom(from, to, RunEndEncodedVector.this);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this will just throw because we don't implement copyFrom, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so.

}
}

/**
Expand Down Expand Up @@ -568,6 +706,7 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers
throw new UnsupportedOperationException(
"Run-end encoded vectors do not have any associated buffers.");
}
this.valueCount = fieldNode.getLength();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,10 +909,12 @@ private void readFromJsonIntoVector(Field field, FieldVector vector) throws IOEx
variadicBufferIndices));
}

int nullCount = 0;
if (type instanceof ArrowType.Null) {
int nullCount;
if (type instanceof ArrowType.RunEndEncoded || type instanceof Union) {
nullCount = 0;
} else if (type instanceof ArrowType.Null) {
nullCount = valueCount;
} else if (!(type instanceof Union)) {
} else {
nullCount = BitVectorHelper.getNullCount(vectorBuffers.get(0), valueCount);
}
final ArrowFieldNode fieldNode = new ArrowFieldNode(valueCount, nullCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public Void visit(RunEndEncodedVector vector, Void value) {
if (runCount == 0) {
validateOrThrow(valueCount == 0, "Run end vector does not contain enough elements");
} else if (runCount > 0) {
double lastEnd = ((BaseIntVector) runEndsVector).getValueAsLong(runCount - 1);
long lastEnd = ((BaseIntVector) runEndsVector).getValueAsLong(runCount - 1);
validateOrThrow(
valueCount == lastEnd,
"Vector logic length not equal to the last end in run ends vector. Logical length %s, last end %s",
Expand Down
Loading
Loading