-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Java][Python] Avoid Reuse VectorSchemaRoot for exporting ArrowArrayStream to other language #36443
Comments
Is it possible to share a self-contained reproduction? That said, I think what might be happening is that the Parquet writer may request more than one batch from the reader, and if you request to share roots, then the previous batch will be overwritten. That is, I would expect this to fail: reader = wrap_from_java_stream(...)
batch1 = reader.read_next_batch()
batch1.validate(full=True) # OK
batch2 = reader.read_next_batch()
batch1.validate(full=True) # Not OK because batch2 and batch1 share the same allocation Exporting data via C Data does not copy the data, so it is your application's responsibility to properly manage the lifetime of the buffers. And Arrow Java uses mutable buffers, so if you enable reusing a VectorSchemaRoot, you'll find that reading new data invalidates previously read data. |
(you may have to read more batches than that to get things to fail, but I hope the idea is clear) |
That explains the failure in my code. Since Arrow Java prefers use the same root to populate data, and so as it in default Do we have any document to explain why java implementation perfers the same root, also why C Data Stream is bound to |
There just isn't a good interface for this in Java. Arrow Java was designed differently from the C++ implementation. |
I suppose the C Data implementation should unload from the root when it exports. |
We should fix this behavior in Java (and ensure it's tested) since it is a surprise. |
Hi @hu6360567 sorry to join late, Could you help me to validate if this is working on your side please? I'm able to create parquet file with data obtained from the database using JDBC adapter and then use C Data Interface to read that from python side: Testing:
Java Produces of Arrow Reader: package org.example.cdata;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.ibatis.jdbc.ScriptRunner;
public class JavaReaderApi {
final static BufferAllocator allocator = new RootAllocator();
public static BufferAllocator getAllocatorForJavaConsumers() {
return allocator;
}
public static ArrowReader getArrowReaderForJavaConsumers(int batchSize, boolean reuseVSR) {
System.out.println("Java Parameters: BatchSize = " + batchSize + ", reuseVSR = " + reuseVSR);
String query = "SELECT int_field1, bool_field2, bigint_field5, char_field16, list_field19 FROM TABLE1";
final Connection connection;
try {
connection = DriverManager.getConnection("jdbc:h2:mem:h2-jdbc-adapter");
} catch (SQLException e) {
throw new RuntimeException(e);
}
final ScriptRunner runnerDDLDML = new ScriptRunner(connection);
runnerDDLDML.setLogWriter(null);
try {
runnerDDLDML.runScript(new BufferedReader(
new FileReader("./src/main/resources/h2-ddl.sql")));
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
try {
runnerDDLDML.runScript(new BufferedReader(
new FileReader("./src/main/resources/h2-dml.sql")));
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
final JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocator,
JdbcToArrowUtils.getUtcCalendar())
.setTargetBatchSize(batchSize)
.setReuseVectorSchemaRoot(reuseVSR)
.setArraySubTypeByColumnNameMap(
new HashMap() {{
put("LIST_FIELD19",
new JdbcFieldInfo(Types.INTEGER));
}}
)
.build();
final ResultSet resultSetConvertToParquet;
try {
resultSetConvertToParquet = connection.createStatement().executeQuery(query);
} catch (SQLException e) {
throw new RuntimeException(e);
}
final ArrowVectorIterator arrowVectorIterator;
try {
arrowVectorIterator = JdbcToArrow.sqlToArrowVectorIterator(
resultSetConvertToParquet, config);
} catch (SQLException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
// get jdbc row data as an arrow reader
final ArrowReader arrowReader = new JDBCReader(allocator, arrowVectorIterator, config);
return arrowReader;
}
}
class JDBCReader extends ArrowReader {
private final ArrowVectorIterator iter;
private final JdbcToArrowConfig config;
private VectorSchemaRoot root;
private boolean firstRoot = true;
public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter, JdbcToArrowConfig config) {
super(allocator);
this.iter = iter;
this.config = config;
}
@Override
public boolean loadNextBatch() throws IOException {
if (firstRoot) {
firstRoot = false;
return true;
}
else {
if (iter.hasNext()) {
if (root != null && !config.isReuseVectorSchemaRoot()) {
root.close();
}
else {
root.allocateNew();
}
root = iter.next();
return root.getRowCount() != 0;
}
else {
return false;
}
}
}
@Override
public long bytesRead() {
return 0;
}
@Override
protected void closeReadSource() throws IOException {
if (root != null && !config.isReuseVectorSchemaRoot()) {
root.close();
}
}
@Override
protected Schema readSchema() throws IOException {
return null;
}
@Override
public VectorSchemaRoot getVectorSchemaRoot() throws IOException {
if (root == null) {
root = iter.next();
}
return root;
}
} Python Side: import jpype
import pyarrow as pa
import pyarrow.csv as csv
import pyarrow.dataset as ds
import sys
from pyarrow.cffi import ffi
def getRecordBatchReader(py_stream_ptr):
generator = getIterableRecordBatchReader(py_stream_ptr)
schema = next(generator)
return pa.RecordBatchReader.from_batches(schema, generator)
def getIterableRecordBatchReader(py_stream_ptr):
with pa.RecordBatchReader._import_from_c(py_stream_ptr) as reader:
yield reader.schema
yield from reader
# batchSize = int(sys.argv[1]), reuseVSR = eval(sys.argv[2], log|parquet = str(sys.argv[3])
jpype.startJVM(classpath=[
"./target/java-python-by-cdata-1.0-SNAPSHOT-jar-with-dependencies.jar"])
java_reader_api = jpype.JClass('org.example.cdata.JavaReaderApi')
java_c_package = jpype.JPackage("org").apache.arrow.c
py_stream = ffi.new("struct ArrowArrayStream*")
py_stream_ptr = int(ffi.cast("uintptr_t", py_stream))
java_wrapped_stream = java_c_package.ArrowArrayStream.wrap(py_stream_ptr)
# get reader data exported into memoryAddress
print('Python Parameters: BatchSize = ' + sys.argv[1] + ', reuseVSR = ' +
sys.argv[2])
java_c_package.Data.exportArrayStream(
java_reader_api.getAllocatorForJavaConsumers(),
java_reader_api.getArrowReaderForJavaConsumers(int(sys.argv[1]),
eval(sys.argv[2])),
java_wrapped_stream)
with getRecordBatchReader(py_stream_ptr) as streamsReaderForJava:
# print logs
if str(sys.argv[3]) == 'log':
for batch in streamsReaderForJava:
print(batch.num_rows)
print(batch.num_columns)
print(batch.to_pylist())
# create parquet file
elif str(sys.argv[3]) == 'parquet':
ds.write_dataset(streamsReaderForJava,
'./jdbc/parquet',
format="parquet")
# create csv file
elif str(sys.argv[3]) == 'csv':
with csv.CSVWriter('./jdbc/csv',
streamsReaderForJava.schema) as writer:
for record_batch in streamsReaderForJava:
writer.write_batch(record_batch)
else:
print('Invalid parameter. Values supported are: {log, parquet, csv}') Java POM <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>java-python-by-cdata</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>java-python-by-cdata</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<arrow.version>12.0.0</arrow.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-jdbc</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-dataset</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
<dependency>
<groupId>org.apache.ibatis</groupId>
<artifactId>ibatis-core</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.1.214</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>org.example.cdata.JavaReaderApi</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
DML/DDL
|
Alternatively, you might want to consider https://github.com/davisusanibar/java-python-by-cdata |
@davisusanibar the 'right' fix is probably to unload the root when exporting and to make exports work in terms of ArrowRecordBatch instead of VectorSchemaRoot |
Yes, that is my second step, first I want to validate that it is working, then I can create a new issue for that enhancement. |
I have a doubt about how-to close resources properly, all of them are open, because any of this are inside try-with-resources.
|
It would generally look like try (final ArrowRecordBatch batch = ...) {
exportBatch(batch); // increments reference count
}
// batch is not freed here unless exportBatch threw (but reference count is decreased)
// when Python frees the arrow::RecordBatch, the C Data callback will
// decrement the reference count and actually free the batch The only problem is the lifetime of the |
It cannot use a very close lifetime scoped |
Just created this PR for cookbook, I would appreciate it if you could help me validate the notes mentions
|
Hi, I’m new to Arrow, and I’ve encountered a similar issue where I need to convert an ArrowVectorIterator to an ArrowArrayStream. Do you have any good suggestions on how to approach this? Thanks for your advice. |
Describe the bug, including details regarding any error messages, version, and platform.
I'm trying to import/export data to database in python through
ArrayStream
over pyarrow.jvm and JDBC.In order to export ArrowVectorIterator as stream without unloading to RecordBatch on java side before it export to stream, I wrap ArrowVectorIterator into ArrowReader as below:
When ArrowVectorIterator use the config with
reuseVectorSchemaRoot
is enabled, utf8 array may crushed on python side, but works as expectred on java side.Java code as below
On Python side, the situation is unexplainable.
The exported stream from Java in wrapped into a RecordBatchReader and write into different file formats.
For CSV, works as expected
For Parquet, writing with dataset api as below
OR
In order to making out which record raises error, RecordBatchReader is wrapped into a smaller batch size and log the content as below:
Although the logger can print the slice, but write_dataset fails
For arrow/feather format, it seems directly write record_batch into files, but when record_batch is invalid when reading from file (code is similar as above)
Then, if I create the ArrowVectorIteratorReader without reuseVectorSchemaRoot, everything works fine on Python side.
Component(s)
Java, Python
The text was updated successfully, but these errors were encountered: