-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Java] Document how to convert JDBC Adapter result into a Parquet file #316
base: main
Are you sure you want to change the base?
Changes from 4 commits
52a7034
7d448c0
b81312a
95edea4
991f40b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -307,3 +307,191 @@ values to the given scale. | |
102 true 100000000030.0000000 some char text [1,2] | ||
INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19 | ||
103 true 10000000003.0000000 some char text [1] | ||
|
||
Write ResultSet to Parquet File | ||
=============================== | ||
|
||
As an example, we are trying to write a parquet file from the JDBC adapter results. | ||
|
||
.. testcode:: | ||
|
||
import java.io.BufferedReader; | ||
import java.io.FileReader; | ||
import java.io.IOException; | ||
import java.nio.file.DirectoryStream; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.sql.Connection; | ||
import java.sql.DriverManager; | ||
import java.sql.ResultSet; | ||
import java.sql.SQLException; | ||
import java.sql.Types; | ||
import java.util.HashMap; | ||
|
||
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator; | ||
import org.apache.arrow.adapter.jdbc.JdbcFieldInfo; | ||
import org.apache.arrow.adapter.jdbc.JdbcToArrow; | ||
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig; | ||
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder; | ||
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils; | ||
import org.apache.arrow.dataset.file.DatasetFileWriter; | ||
import org.apache.arrow.dataset.file.FileFormat; | ||
import org.apache.arrow.dataset.file.FileSystemDatasetFactory; | ||
import org.apache.arrow.dataset.jni.NativeMemoryPool; | ||
import org.apache.arrow.dataset.scanner.ScanOptions; | ||
import org.apache.arrow.dataset.scanner.Scanner; | ||
import org.apache.arrow.dataset.source.Dataset; | ||
import org.apache.arrow.dataset.source.DatasetFactory; | ||
import org.apache.arrow.memory.BufferAllocator; | ||
import org.apache.arrow.memory.RootAllocator; | ||
import org.apache.arrow.vector.VectorSchemaRoot; | ||
import org.apache.arrow.vector.ipc.ArrowReader; | ||
import org.apache.arrow.vector.types.pojo.Schema; | ||
import org.apache.ibatis.jdbc.ScriptRunner; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import ch.qos.logback.classic.Level; | ||
import ch.qos.logback.classic.Logger; | ||
|
||
class JDBCReader extends ArrowReader { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we somehow delete the duplicate code here and reuse the other one? Or combine the two? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only JDBC is maintaining this demo example now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Awesome! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explain that we need this because writing a dataset takes an ArrowReader, so we have to adapt the JDBC ArrowVectorIterator to the ArrowReader interface |
||
private final ArrowVectorIterator iter; | ||
private final JdbcToArrowConfig config; | ||
private VectorSchemaRoot root; | ||
private boolean firstRoot = true; | ||
|
||
public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter, JdbcToArrowConfig config) { | ||
super(allocator); | ||
this.iter = iter; | ||
this.config = config; | ||
} | ||
|
||
@Override | ||
public boolean loadNextBatch() throws IOException { | ||
if (firstRoot) { | ||
firstRoot = false; | ||
return true; | ||
} | ||
else { | ||
if (iter.hasNext()) { | ||
if (root != null && !config.isReuseVectorSchemaRoot()) { | ||
root.close(); | ||
} | ||
else { | ||
root.allocateNew(); | ||
} | ||
root = iter.next(); | ||
return root.getRowCount() != 0; | ||
} | ||
else { | ||
return false; | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public long bytesRead() { | ||
return 0; | ||
} | ||
|
||
@Override | ||
protected void closeReadSource() throws IOException { | ||
if (root != null && !config.isReuseVectorSchemaRoot()) { | ||
root.close(); | ||
} | ||
} | ||
|
||
@Override | ||
protected Schema readSchema() throws IOException { | ||
return null; | ||
} | ||
|
||
@Override | ||
public VectorSchemaRoot getVectorSchemaRoot() throws IOException { | ||
if (root == null) { | ||
root = iter.next(); | ||
} | ||
return root; | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we add whitespace to the code below so it's organized into sections? I think it will be easier to read. |
||
|
||
((Logger) LoggerFactory.getLogger("org.apache.arrow")).setLevel(Level.TRACE); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we fiddling with loggers and adding logback to the example? I don't think we need any of that? |
||
try ( | ||
final BufferAllocator allocator = new RootAllocator(); | ||
final BufferAllocator allocatorJDBC = allocator.newChildAllocator("allocatorJDBC", 0, Long.MAX_VALUE); | ||
final BufferAllocator allocatorReader = allocator.newChildAllocator("allocatorReader", 0, Long.MAX_VALUE); | ||
final BufferAllocator allocatorParquetWrite = allocator.newChildAllocator("allocatorParquetWrite", 0, | ||
Long.MAX_VALUE); | ||
final Connection connection = DriverManager.getConnection( | ||
"jdbc:h2:mem:h2-jdbc-adapter") | ||
) { | ||
ScriptRunner runnerDDLDML = new ScriptRunner(connection); | ||
runnerDDLDML.setLogWriter(null); | ||
runnerDDLDML.runScript(new BufferedReader( | ||
new FileReader("./thirdpartydeps/jdbc/h2-ddl.sql"))); | ||
runnerDDLDML.runScript(new BufferedReader( | ||
new FileReader("./thirdpartydeps/jdbc/h2-dml.sql"))); | ||
JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocatorJDBC, | ||
JdbcToArrowUtils.getUtcCalendar()) | ||
.setTargetBatchSize(2) | ||
.setReuseVectorSchemaRoot(true) | ||
.setArraySubTypeByColumnNameMap( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the interest of keeping examples concise, let's use sample data that doesn't require us to deal with all of this in the first place. |
||
new HashMap<>() {{ | ||
put("LIST_FIELD19", | ||
new JdbcFieldInfo(Types.INTEGER)); | ||
}} | ||
) | ||
.build(); | ||
String query = "SELECT int_field1, bool_field2, bigint_field5, char_field16, list_field19 FROM TABLE1"; | ||
try ( | ||
final ResultSet resultSetConvertToParquet = connection.createStatement().executeQuery(query); | ||
final ArrowVectorIterator arrowVectorIterator = JdbcToArrow.sqlToArrowVectorIterator( | ||
resultSetConvertToParquet, config) | ||
) { | ||
Path uri = Files.createTempDirectory("parquet_"); | ||
try ( | ||
// get jdbc row data as a arrow reader | ||
final JDBCReader arrowReader = new JDBCReader(allocatorReader, arrowVectorIterator, config) | ||
) { | ||
// write arrow reader to parqueet file | ||
DatasetFileWriter.write(allocatorParquetWrite, arrowReader, FileFormat.PARQUET, uri.toUri().toString()); | ||
} | ||
// validate data of parquet file created | ||
ScanOptions options = new ScanOptions(/*batchSize*/ 32768); | ||
try ( | ||
DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, | ||
NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri.toUri().toString()); | ||
Dataset dataset = datasetFactory.finish(); | ||
Scanner scanner = dataset.newScan(options); | ||
ArrowReader reader = scanner.scanBatches() | ||
) { | ||
while (reader.loadNextBatch()) { | ||
System.out.print(reader.getVectorSchemaRoot().contentToTSVString()); | ||
System.out.println("RowCount: " + reader.getVectorSchemaRoot().getRowCount()); | ||
} | ||
} catch (Exception e) { | ||
e.printStackTrace(); | ||
throw new RuntimeException(e); | ||
} | ||
// delete temporary parquet file created | ||
try (DirectoryStream<Path> dir = Files.newDirectoryStream(uri)) { | ||
uri.toFile().deleteOnExit(); | ||
for (Path path : dir) { | ||
path.toFile().deleteOnExit(); | ||
} | ||
} | ||
} | ||
runnerDDLDML.closeConnection(); | ||
} catch (SQLException | IOException e) { | ||
e.printStackTrace(); | ||
throw new RuntimeException(e); | ||
} | ||
|
||
.. testoutput:: | ||
|
||
INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19 | ||
101 true 1000000000300 some char text [1,2,3] | ||
102 true 100000000030 some char text [1,2] | ||
RowCount: 2 | ||
INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19 | ||
103 true 10000000003 some char text [1] | ||
RowCount: 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a comment describing what's in this file? Looks like it's three row groups of 3 rows each based on the output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added