Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java] Document how to convert JDBC Adapter result into a Parquet file #316

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion java/source/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -533,4 +533,4 @@ Let's read a CSV file.
Salesforce Slack 27.7 01/12/2020
Total batch size: 3

.. _Arrow Java Dataset: https://arrow.apache.org/docs/dev/java/dataset.html
.. _Arrow Java Dataset: https://arrow.apache.org/docs/dev/java/dataset.html
5 changes: 5 additions & 0 deletions java/source/demo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,10 @@
<artifactId>core</artifactId>
<version>0.11.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
</dependencies>
</project>
108 changes: 106 additions & 2 deletions java/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,93 @@ Write - Out to Buffer

Number of rows written: 3

Write Parquet Files
*******************

Let's read an Arrow file and populate that data into a Parquet file.

.. testcode::

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.apache.arrow.dataset.file.DatasetFileWriter;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.SeekableReadChannel;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;

// read arrow demo data: Three row groups each consisting of three rows
Path uriRead = Paths.get("./thirdpartydeps/arrowfiles/random_access.arrow");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a comment describing what's in this file? Looks like it's three row groups of 3 rows each based on the output.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

try (
BufferAllocator allocator = new RootAllocator();
ArrowFileReader readerForDemoData = new ArrowFileReader(
new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel(
Files.readAllBytes(uriRead))), allocator)
) {
Path uriWrite = Files.createTempDirectory("parquet_");
// write data for new parquet file
DatasetFileWriter.write(allocator, readerForDemoData, FileFormat.PARQUET, uriWrite.toUri().toString());
// validate data of parquet file just created
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator,
NativeMemoryPool.getDefault(), FileFormat.PARQUET, uriWrite.toUri().toString());
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader readerForFileCreated = scanner.scanBatches()
) {
while (readerForFileCreated.loadNextBatch()) {
System.out.print(readerForFileCreated.getVectorSchemaRoot().contentToTSVString());
System.out.println("RowCount: " + readerForFileCreated.getVectorSchemaRoot().getRowCount());
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
// delete temporary parquet file created
try (DirectoryStream<Path> dir = Files.newDirectoryStream(uriWrite)) {
uriWrite.toFile().deleteOnExit();
for (Path path : dir) {
path.toFile().deleteOnExit();
}
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}

.. testoutput::

name age
David 10
Gladis 20
Juan 30
RowCount: 3
name age
Nidia 15
Alexa 20
Mara 15
RowCount: 3
name age
Raul 34
Jhon 29
Thomy 33
RowCount: 3

Reading
=======

Expand Down Expand Up @@ -461,8 +548,8 @@ Reading Parquet File

Please check :doc:`Dataset <./dataset>`

Handling Data with Dictionaries
*******************************
Reading Data with Dictionaries
******************************

Reading and writing dictionary-encoded data requires separately tracking the dictionaries.

Expand Down Expand Up @@ -579,3 +666,20 @@ Reading and writing dictionary-encoded data requires separately tracking the dic
Dictionary-encoded data recovered: [0, 3, 4, 5, 7]
Dictionary recovered: Dictionary DictionaryEncoding[id=666,ordered=false,indexType=Int(8, true)] [Andorra, Cuba, Grecia, Guinea, Islandia, Malta, Tailandia, Uganda, Yemen, Zambia]
Decoded data: [Andorra, Guinea, Islandia, Malta, Uganda]

Reading Custom Dataset
**********************

If you need to implement a custom dataset reader, consider extending `ArrowReader`_ class.

The ArrowReader class can be extended as follows:

1. Write the logic to read schema on ``readSchema()``.
2. If you do not want to define a logic for reading the schema, then you will also need to override ``getVectorSchemaRoot()``.
3. Once (1) or (2) have been completed, you can proceed to ``loadNextBatch()``.
4. At the end don’t forget to define the logic to ``closeReadSource()``.
5. Make sure you define the logic for closing the ``closeReadSource()`` at the end.

You could see and example of custom JDBC Reader at :doc:`Write ResultSet to Parquet File <./jdbc>`

.. _`ArrowReader`: https://arrow.apache.org/docs/java/reference/org/apache/arrow/vector/ipc/ArrowReader.html
188 changes: 188 additions & 0 deletions java/source/jdbc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,191 @@ values to the given scale.
102 true 100000000030.0000000 some char text [1,2]
INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19
103 true 10000000003.0000000 some char text [1]

Write ResultSet to Parquet File
===============================

As an example, we are trying to write a parquet file from the JDBC adapter results.

.. testcode::

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;

import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.dataset.file.DatasetFileWriter;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.ibatis.jdbc.ScriptRunner;
import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;

class JDBCReader extends ArrowReader {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we somehow delete the duplicate code here and reuse the other one? Or combine the two?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only JDBC is maintaining this demo example now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain that we need this because writing a dataset takes an ArrowReader, so we have to adapt the JDBC ArrowVectorIterator to the ArrowReader interface

private final ArrowVectorIterator iter;
private final JdbcToArrowConfig config;
private VectorSchemaRoot root;
private boolean firstRoot = true;

public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter, JdbcToArrowConfig config) {
super(allocator);
this.iter = iter;
this.config = config;
}

@Override
public boolean loadNextBatch() throws IOException {
if (firstRoot) {
firstRoot = false;
return true;
}
else {
if (iter.hasNext()) {
if (root != null && !config.isReuseVectorSchemaRoot()) {
root.close();
}
else {
root.allocateNew();
}
root = iter.next();
return root.getRowCount() != 0;
}
else {
return false;
}
}
}

@Override
public long bytesRead() {
return 0;
}

@Override
protected void closeReadSource() throws IOException {
if (root != null && !config.isReuseVectorSchemaRoot()) {
root.close();
}
}

@Override
protected Schema readSchema() throws IOException {
return null;
}

@Override
public VectorSchemaRoot getVectorSchemaRoot() throws IOException {
if (root == null) {
root = iter.next();
}
return root;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add whitespace to the code below so it's organized into sections? I think it will be easier to read.


((Logger) LoggerFactory.getLogger("org.apache.arrow")).setLevel(Level.TRACE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we fiddling with loggers and adding logback to the example? I don't think we need any of that?

try (
final BufferAllocator allocator = new RootAllocator();
final BufferAllocator allocatorJDBC = allocator.newChildAllocator("allocatorJDBC", 0, Long.MAX_VALUE);
final BufferAllocator allocatorReader = allocator.newChildAllocator("allocatorReader", 0, Long.MAX_VALUE);
final BufferAllocator allocatorParquetWrite = allocator.newChildAllocator("allocatorParquetWrite", 0,
Long.MAX_VALUE);
final Connection connection = DriverManager.getConnection(
"jdbc:h2:mem:h2-jdbc-adapter")
) {
ScriptRunner runnerDDLDML = new ScriptRunner(connection);
runnerDDLDML.setLogWriter(null);
runnerDDLDML.runScript(new BufferedReader(
new FileReader("./thirdpartydeps/jdbc/h2-ddl.sql")));
runnerDDLDML.runScript(new BufferedReader(
new FileReader("./thirdpartydeps/jdbc/h2-dml.sql")));
JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocatorJDBC,
JdbcToArrowUtils.getUtcCalendar())
.setTargetBatchSize(2)
.setReuseVectorSchemaRoot(true)
.setArraySubTypeByColumnNameMap(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the interest of keeping examples concise, let's use sample data that doesn't require us to deal with all of this in the first place.

new HashMap<>() {{
put("LIST_FIELD19",
new JdbcFieldInfo(Types.INTEGER));
}}
)
.build();
String query = "SELECT int_field1, bool_field2, bigint_field5, char_field16, list_field19 FROM TABLE1";
try (
final ResultSet resultSetConvertToParquet = connection.createStatement().executeQuery(query);
final ArrowVectorIterator arrowVectorIterator = JdbcToArrow.sqlToArrowVectorIterator(
resultSetConvertToParquet, config)
) {
Path uri = Files.createTempDirectory("parquet_");
try (
// get jdbc row data as a arrow reader
final JDBCReader arrowReader = new JDBCReader(allocatorReader, arrowVectorIterator, config)
) {
// write arrow reader to parqueet file
DatasetFileWriter.write(allocatorParquetWrite, arrowReader, FileFormat.PARQUET, uri.toUri().toString());
}
// validate data of parquet file created
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator,
NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri.toUri().toString());
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader reader = scanner.scanBatches()
) {
while (reader.loadNextBatch()) {
System.out.print(reader.getVectorSchemaRoot().contentToTSVString());
System.out.println("RowCount: " + reader.getVectorSchemaRoot().getRowCount());
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
// delete temporary parquet file created
try (DirectoryStream<Path> dir = Files.newDirectoryStream(uri)) {
uri.toFile().deleteOnExit();
for (Path path : dir) {
path.toFile().deleteOnExit();
}
}
}
runnerDDLDML.closeConnection();
} catch (SQLException | IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}

.. testoutput::

INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19
101 true 1000000000300 some char text [1,2,3]
102 true 100000000030 some char text [1,2]
RowCount: 2
INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19
103 true 10000000003 some char text [1]
RowCount: 1
8 changes: 4 additions & 4 deletions java/source/substrait.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Here is an example of a Java program that queries a Parquet file:
import java.util.HashMap;
import java.util.Map;

static Plan queryTableNation() throws SqlParseException {
Plan queryTableNation() throws SqlParseException {
String sql = "SELECT * FROM NATION WHERE N_NATIONKEY = 17";
String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " +
"N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
Expand All @@ -72,7 +72,7 @@ Here is an example of a Java program that queries a Parquet file:
return plan;
}

static void queryDatasetThruSubstraitPlanDefinition() {
void queryDatasetThruSubstraitPlanDefinition() {
String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
Expand Down Expand Up @@ -135,7 +135,7 @@ For example, we can join the nation and customer tables from the TPC-H benchmark
import java.util.HashMap;
import java.util.Map;

static Plan queryTableNationJoinCustomer() throws SqlParseException {
Plan queryTableNationJoinCustomer() throws SqlParseException {
String sql = "SELECT n.n_name, COUNT(*) AS NUMBER_CUSTOMER FROM NATION n JOIN CUSTOMER c " +
"ON n.n_nationkey = c.c_nationkey WHERE n.n_nationkey = 17 " +
"GROUP BY n.n_name";
Expand All @@ -151,7 +151,7 @@ For example, we can join the nation and customer tables from the TPC-H benchmark
return plan;
}

static void queryTwoDatasetsThruSubstraitPlanDefinition() {
void queryTwoDatasetsThruSubstraitPlanDefinition() {
String uriNation = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet";
String uriCustomer = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/customer.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
Expand Down