Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java] Document how to convert JDBC Adapter result into a Parquet file #316

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 89 additions & 1 deletion java/source/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -533,4 +533,92 @@ Let's read a CSV file.
Salesforce Slack 27.7 01/12/2020
Total batch size: 3

.. _Arrow Java Dataset: https://arrow.apache.org/docs/dev/java/dataset.html
.. _Arrow Java Dataset: https://arrow.apache.org/docs/dev/java/dataset.html


Write Parquet Files
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to io.rst? That's were "Read parquet" is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to io.rst? That's were "Read parquet" is.

Currently, io.rst redirect to dataset.rst for read parquet.

What about to add write parquet on io.rst to also redirect to dataset.rst for write parquet?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think it's actually better to put "Write Parquet" examples in io.rst. The dataset.rst examples are primarily for querying (reading) data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

===================

Let's read an Arrow file and populate that data into a Parquet file.

.. testcode::

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.apache.arrow.dataset.file.DatasetFileWriter;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.SeekableReadChannel;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;

// read arrow demo data
Path uriRead = Paths.get("./thirdpartydeps/arrowfiles/random_access.arrow");
try (
BufferAllocator allocator = new RootAllocator();
ArrowFileReader readerForDemoData = new ArrowFileReader(
new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel(
Files.readAllBytes(uriRead))), allocator)
) {
Path uriWrite = Files.createTempDirectory("parquet_");
// write data for new parquet file
DatasetFileWriter.write(allocator, readerForDemoData, FileFormat.PARQUET, uriWrite.toUri().toString());
// validate data of parquet file just created
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator,
NativeMemoryPool.getDefault(), FileFormat.PARQUET, uriWrite.toUri().toString());
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader readerForFileCreated = scanner.scanBatches()
) {
while (readerForFileCreated.loadNextBatch()) {
System.out.print(readerForFileCreated.getVectorSchemaRoot().contentToTSVString());
System.out.println("RowCount: " + readerForFileCreated.getVectorSchemaRoot().getRowCount());
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
// delete temporary parquet file created
try (DirectoryStream<Path> dir = Files.newDirectoryStream(uriWrite)) {
uriWrite.toFile().deleteOnExit();
for (Path path : dir) {
path.toFile().deleteOnExit();
}
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}

.. testoutput::

name age
David 10
Gladis 20
Juan 30
RowCount: 3
name age
Nidia 15
Alexa 20
Mara 15
RowCount: 3
name age
Raul 34
Jhon 29
Thomy 33
RowCount: 3
5 changes: 5 additions & 0 deletions java/source/demo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,10 @@
<artifactId>core</artifactId>
<version>0.11.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
</dependencies>
</project>
4 changes: 2 additions & 2 deletions java/source/flight.rst
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ Flight Client and Server
S1: Server (Location): Listening on port 33333
C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333
C2: Client (Populate Data): Wrote 2 batches with 3 rows each
C3: Client (Get Metadata): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=6, ordered=false}
C3: Client (Get Metadata): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a, expirationTime=(none)}], bytes=-1, records=6, ordered=false}
C4: Client (Get Stream):
Client Received batch #1, Data:
name
Expand All @@ -299,7 +299,7 @@ Flight Client and Server
Manuel
Felipe
JJ
C5: Client (List Flights Info): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=6, ordered=false}
C5: Client (List Flights Info): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a, expirationTime=(none)}], bytes=-1, records=6, ordered=false}
C6: Client (Do Delete Action): Delete completed
C7: Client (List Flights Info): After delete - No records
C8: Server shut down successfully
Expand Down
92 changes: 92 additions & 0 deletions java/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -579,3 +579,95 @@ Reading and writing dictionary-encoded data requires separately tracking the dic
Dictionary-encoded data recovered: [0, 3, 4, 5, 7]
Dictionary recovered: Dictionary DictionaryEncoding[id=666,ordered=false,indexType=Int(8, true)] [Andorra, Cuba, Grecia, Guinea, Islandia, Malta, Tailandia, Uganda, Yemen, Zambia]
Decoded data: [Andorra, Guinea, Islandia, Malta, Uganda]

Customize Logic to Read Dataset
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to jdbc.rst? I think it fits better there since its directly applicable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just maintain the steps needed to implement a data reader, and references as an example to jdbc page.

===============================

If you need to implement a custom dataset reader, consider extending `ArrowReader`_ class.

The ArrowReader class can be extended as follows:

1. Write the logic to read schema on ``readSchema()``.
2. If you do not want to define a logic for reading the schema, then you will also need to override ``getVectorSchemaRoot()``.
3. Once (1) or (2) have been completed, you can proceed to ``loadNextBatch()``.
4. At the end don’t forget to define the logic to ``closeReadSource()``.
5. Make sure you define the logic for closing the ``closeReadSource()`` at the end.

For example, let's create a custom JDBCReader reader.

.. code-block:: java

import java.io.IOException;

import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;

class JDBCReader extends ArrowReader {
private final ArrowVectorIterator iter;
private final JdbcToArrowConfig config;
private VectorSchemaRoot root;
private boolean firstRoot = true;

public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter, JdbcToArrowConfig config) {
super(allocator);
this.iter = iter;
this.config = config;
}

@Override
public boolean loadNextBatch() throws IOException {
if (firstRoot) {
firstRoot = false;
return true;
}
else {
if (iter.hasNext()) {
if (root != null && !config.isReuseVectorSchemaRoot()) {
root.close();
}
else {
root.allocateNew();
}
root = iter.next();
return root.getRowCount() != 0;
}
else {
return false;
}
}
}

@Override
public long bytesRead() {
return 0;
}

@Override
protected void closeReadSource() throws IOException {
if (root != null && !config.isReuseVectorSchemaRoot()) {
root.close();
}
}

@Override
protected Schema readSchema() throws IOException {
return null;
}

@Override
public VectorSchemaRoot getVectorSchemaRoot() throws IOException {
if (root == null) {
root = iter.next();
}
return root;
}
}




.. _`ArrowReader`: https://arrow.apache.org/docs/java/reference/org/apache/arrow/vector/ipc/ArrowReader.html
Loading